北京大學(xué)(中國)校訓(xùn):“自由平等,民主科學(xué)版保∥匦Γ”
話說,所有不談源碼的開發(fā)講解都是瞎扯淡彻犁。今天叫胁,陰雨綿綿,心中暑氣全無汞幢。要不一起看看Thread類的源碼驼鹅,jdk的源碼倒是很簡單的,這個類中大量重要方法是native方法森篷,在jvm中實(shí)現(xiàn)的输钩,那可不是看Java代碼了,有c基礎(chǔ)的仲智,那就沒多大問題了买乃,只要能夠看到大概意思,我覺得對開發(fā)者來說足夠用了钓辆,不至于有哪家公司招你來把jvm中的代碼改下,哈哈。。孙援。
今天需要查看Thread中的jvm源碼竟稳,jdk中的方法與jvm源碼方法是具有對應(yīng)關(guān)系的,通俗理解為注冊表,具體可在openjdk\jdk\src\share\native\java\lang\Thread.c,現(xiàn)摘出部分映射關(guān)系:
static JNINativeMethod methods[] = {
{"start0", "()V", (void *)&JVM_StartThread},
{"stop0", "(" OBJ ")V", (void *)&JVM_StopThread},
{"isAlive", "()Z", (void *)&JVM_IsThreadAlive},
{"suspend0", "()V", (void *)&JVM_SuspendThread},
{"resume0", "()V", (void *)&JVM_ResumeThread},
{"setPriority0", "(I)V", (void *)&JVM_SetThreadPriority},
{"yield", "()V", (void *)&JVM_Yield},
{"sleep", "(J)V", (void *)&JVM_Sleep},
{"currentThread", "()" THD, (void *)&JVM_CurrentThread},
{"countStackFrames", "()I", (void *)&JVM_CountStackFrames},
{"interrupt0", "()V", (void *)&JVM_Interrupt},
{"isInterrupted", "(Z)Z", (void *)&JVM_IsInterrupted},
{"holdsLock", "(" OBJ ")Z", (void *)&JVM_HoldsLock},
{"getThreads", "()[" THD, (void *)&JVM_GetAllThreads},
{"dumpThreads", "([" THD ")[[" STE, (void *)&JVM_DumpThreads},
{"setNativeName", "(" STR ")V", (void *)&JVM_SetNativeThreadName},
};
一 線程創(chuàng)建與初始化
下面的代碼片段已經(jīng)加了詳細(xì)的注釋混聊,在此就不說了咳胃,直接上干貨吧
//名稱
private volatile String name;
//優(yōu)先級
private int priority;
//守護(hù)線程標(biāo)識
private boolean daemon = false;
//線程執(zhí)行的目標(biāo)對象
private Runnable target;
//線程組
private ThreadGroup group;
//當(dāng)前線程的指定棧大小存崖,默認(rèn)值為0演顾,設(shè)置似乎意義不大胎源,具體棧分配由jvm決定
private long stackSize;
//線程序列號,為0
private static long threadSeqNumber;
//線程id:由threadSeqNumber++生成
private long tid;
//標(biāo)識線程狀態(tài)奢驯,默認(rèn)是線程未啟動
/**
* 線程狀態(tài)有如下幾種:NEW RUNNABLE WAITING TIMED_WAITING TERMINATED
* NEW時對應(yīng)的threadStatus為0;
* */
private int threadStatus = 0;
//存儲當(dāng)前線程的局部變量
ThreadLocal.ThreadLocalMap threadLocals = null;
/**
* 在創(chuàng)建子線程時豁跑,子線程會接收所有可繼承的線程局部變量的初始值卸夕,以獲得父線程所具有的值
* 為子線程提供從父線程那里繼承的值
* */
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
//為LockSupport提供的變量贡羔,具體可查看LockSupport源碼
volatile Object parkBlocker;
//阻塞器鎖,主要用于處理阻塞情況
private volatile Interruptible blocker;
//阻斷鎖
private Object blockerLock = new Object();
//最低優(yōu)先級
public final static int MIN_PRIORITY = 1;
//默認(rèn)優(yōu)先級
public final static int NORM_PRIORITY = 5;
//最高優(yōu)先級
public final static int MAX_PRIORITY = 10;
/**
* Thread有多種構(gòu)造器痊班,這里只列出最全的構(gòu)造方法
* 所有構(gòu)造器均調(diào)用init方法
* */
public Thread(ThreadGroup group, Runnable target, String name, long stackSize) {
init(group, target, name, stackSize, null, true);
}
/**
* 底層init方法,用于初始化thread對象
* 參數(shù)已在上述屬性中做了說明
* */
private void init(ThreadGroup g, Runnable target, String name, long stackSize,
AccessControlContext acc, boolean inheritThreadLocals) {
if (name == null) {
throw new NullPointerException("name cannot be null");
}
this.name = name;
/**
* 獲取當(dāng)前線程摹量,即創(chuàng)建線程的線程
* 這里體現(xiàn)了父子線程的關(guān)系
* */
Thread parent = currentThread();
//獲得系統(tǒng)的安全管理器
SecurityManager security = System.getSecurityManager();
if (g == null) {
if (security != null) {
g = security.getThreadGroup();
}
//設(shè)置線程組涤伐,如果子線程未指定,則取父線程的
if (g == null) {
g = parent.getThreadGroup();
}
}
g.checkAccess();
//線程組未啟動線程個數(shù)++
g.addUnstarted();
//線程組
this.group = g;
//守護(hù)線程繼承性缨称,子線程的是否守護(hù)取決于父線程
this.daemon = parent.isDaemon();
//優(yōu)先級繼承性凝果,子線程的優(yōu)先級取決于父線程
this.priority = parent.getPriority();
//Runnable對象
this.target = target;
//設(shè)置優(yōu)先級
setPriority(priority);
if (inheritThreadLocals && parent.inheritableThreadLocals != null)
//為子線程提供從父線程那里繼承的值
this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
this.stackSize = stackSize;
//生成線程ID
tid = nextThreadID();
}
//tid生成器
private static synchronized long nextThreadID() {
return ++threadSeqNumber;
}
public final ThreadGroup getThreadGroup() {
return this.group;
}
public final boolean isDaemon() {
return this.daemon;
}
public final int getPriority() {
return this.priority;
}
public final void setPriority(int priority) {
if (priority <= 10 && priority >= 1) {
ThreadGroup var2;
if ((var2 = this.getThreadGroup()) != null) {
//這里需要注意:線程的優(yōu)先級上限取決于所屬線程組的優(yōu)先級
if (priority > var2.getMaxPriority()) {
priority = var2.getMaxPriority();
}
this.setPriority0(this.priority = priority);
}
} else {
throw new IllegalArgumentException();
}
}
/**
* 線程執(zhí)行的具體任務(wù)
*/
public void run() {
if (target != null) {
target.run();
}
}
/**
* 線程真正退出前執(zhí)行清理
*/
private void exit() {
if (group != null) {
group = null;
}
target = null;
threadLocals = null;
inheritableThreadLocals = null;
blocker = null;
}
//獲取當(dāng)前線程的native方法
public static native Thread currentThread();
//設(shè)置線程優(yōu)先級的native方法
private native void setPriority0(int var1);
在上述有兩個native方法,即currentThread()和setPriority0()睦尽,其中currentThread是獲取父線程器净,setPriority0是設(shè)置線程優(yōu)先級,均在jvm.cpp中当凡,點(diǎn)擊查看山害;現(xiàn)摘出具體片段:
currentThread方法底層調(diào)用jvm.cpp中的JVM_CurrentThread函數(shù):
JVM_ENTRY(jobject, JVM_CurrentThread(JNIEnv* env, jclass threadClass))
JVMWrapper("JVM_CurrentThread");
oop jthread = thread->threadObj();
assert (thread != NULL, "no current thread!");
return JNIHandles::make_local(env, jthread);
JVM_END
setPriority0方法底層調(diào)用jvm.cpp中的JVM_CurrentThread函數(shù):
JVM_ENTRY(void, JVM_SetThreadPriority(JNIEnv* env, jobject jthread, jint prio))
JVMWrapper("JVM_SetThreadPriority");
// 確保C++線程和OS線程在操作之前不釋放
MutexLocker ml(Threads_lock);
oop java_thread = JNIHandles::resolve_non_null(jthread);
java_lang_Thread::set_priority(java_thread, (ThreadPriority)prio);
JavaThread* thr = java_lang_Thread::thread(java_thread);
if (thr != NULL) {
// 線程尚未啟動,當(dāng)設(shè)置優(yōu)先級才會啟動
Thread::set_priority(thr, (ThreadPriority)prio);
}
JVM_END
二 線程啟動start
start代碼片段如下:
/**
* 調(diào)用start()方法啟動線程沿量,執(zhí)行線程的run方法
*/
public synchronized void start() {
/**
* 線程狀態(tài)校驗(yàn)浪慌,線程必須是0即新建態(tài)才能啟動
* 這也是為何一個線程連續(xù)兩次調(diào)start會報錯
*/
if (threadStatus != 0) throw new IllegalThreadStateException();
//通知線程組當(dāng)前線程即將執(zhí)行,同時線程組中未啟動線程數(shù)-1
group.add(this);
boolean started = false;
try {
//使線程進(jìn)入可執(zhí)行(runnable)狀態(tài)
start0();
started = true;
} finally {
try {
if (!started) {
//啟動失敗后朴则,修改線程組未啟動線程數(shù)+1
group.threadStartFailed(this);
}
} catch (Throwable ignore) { }
}
}
/**
* 設(shè)置線程啟動的native方法
* 底層會新啟動一個線程权纤,新線程才會調(diào)用傳遞過來的Runnable對象run方法
* */
private native void start0();
start0方法底層調(diào)用jvm.cpp中的JVM_StartThread函數(shù):
JVM_ENTRY(void, JVM_StartThread(JNIEnv* env, jobject jthread))
JVMWrapper("JVM_StartThread");
JavaThread *native_thread = NULL;
//由于排序問題,引發(fā)異常時無法持有線程鎖乌妒。示例:我們可能需要在構(gòu)造異常時獲取堆鎖汹想。
bool throw_illegal_thread_state = false;
//在線程start中發(fā)布jvmti事件之前,必須釋放線程鎖芥被。
{
//確保C++線程和OS線程在操作之前沒有被釋放欧宜。
MutexLocker mu(Threads_lock);
//自JDK5以來,線程的threadstatus用于防止重新啟動已啟動的線程拴魄,所以通常會發(fā)現(xiàn)javathread是空的。
//但是對于JNI附加的線程匹中,在創(chuàng)建的線程對象(及其JavaThread集合)和對其ThreadStates的更新之間有一個小窗口,
//因此我們必須檢查這個窗口
if (java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread)) != NULL) {
throw_illegal_thread_state = true;
} else {
//我們還可以檢查stillborn標(biāo)志顶捷,看看這個線程是否已經(jīng)停止,但是出于歷史原因葵蒂,我們讓線程在它開始運(yùn)行時檢測它自己
jlong size = java_lang_Thread::stackSize(JNIHandles::resolve_non_null(jthread));
//分配C++線程結(jié)構(gòu)并創(chuàng)建本地線程,從Java中傳遞過來的stack size已經(jīng)被聲明秦士,
//但是構(gòu)造函數(shù)采用size_t(無符號類型),因此避免傳遞負(fù)值
size_t sz = size > 0 ? (size_t) size : 0;
native_thread = new JavaThread(&thread_entry, sz);
//此時可能由于內(nèi)存不足而沒有為javathread創(chuàng)建Osthread隧土。
//檢查這種情況并拋出異常。最后命爬,我們可能希望更改此項(xiàng)曹傀,以便僅在成功創(chuàng)建線程時獲取鎖,
//然后我們還可以執(zhí)行此檢查并在JavaThread構(gòu)造函數(shù)中拋出異常饲宛。
if (native_thread->osthread() != NULL) {
//注意:當(dāng)前線程未在“準(zhǔn)備”階段使用
native_thread->prepare(jthread);
}
}
}
if (throw_illegal_thread_state) {
THROW(vmSymbols::java_lang_IllegalThreadStateException());
}
assert(native_thread != NULL, "Starting null thread?");
if (native_thread->osthread() == NULL) {
// No one should hold a reference to the 'native_thread'.
delete native_thread;
if (JvmtiExport::should_post_resource_exhausted()) {
JvmtiExport::post_resource_exhausted(
JVMTI_RESOURCE_EXHAUSTED_OOM_ERROR | JVMTI_RESOURCE_EXHAUSTED_THREADS,
"unable to create new native thread");
}
THROW_MSG(vmSymbols::java_lang_OutOfMemoryError(),
"unable to create new native thread");
}
Thread::start(native_thread);
JVM_END
三 線程中斷判斷
/**
* 判斷線程是否已經(jīng)中斷皆愉,同時清除中斷標(biāo)識
* static方法,
*/
public static boolean interrupted() {
return currentThread().isInterrupted(true);
}
/**
* 判斷線程是否已經(jīng)中斷落萎,不清除中斷標(biāo)識
* this代表當(dāng)前調(diào)用此方法的線程對象
*/
public boolean isInterrupted() {
return this.isInterrupted(false);
}
/**
* native方法判斷線程是否中斷
*/
private native boolean isInterrupted(boolean ClearInterrupted);
isInterrupted方法底層調(diào)用jvm.cpp中的JVM_IsInterrupted函數(shù):
JVM_QUICK_ENTRY(jboolean, JVM_IsInterrupted(JNIEnv* env, jobject jthread, jboolean clear_interrupted))
JVMWrapper("JVM_IsInterrupted");
//確保C++線程和OS線程在操作之前沒有被釋放
oop java_thread = JNIHandles::resolve_non_null(jthread);
MutexLockerEx ml(thread->threadObj() == java_thread ? NULL : Threads_lock);
//我們需要重新解析java_thread亥啦,因?yàn)樵讷@取鎖的過程中可能會發(fā)生GC
JavaThread* thr = java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread));
if (thr == NULL) {
return JNI_FALSE;
} else {
return (jboolean) Thread::is_interrupted(thr, clear_interrupted != 0);
}
JVM_END
四 線程join
/**
* 等待調(diào)用join的線程執(zhí)行結(jié)束
*/
public final synchronized void join(long var1) throws InterruptedException {
long var3 = System.currentTimeMillis();
long var5 = 0L;
if (var1 < 0L) {
throw new IllegalArgumentException("timeout value is negative");
} else {
//如果join時不設(shè)置超時,則會調(diào)用Object.wait的無超時等待
if (var1 == 0L) {
while(this.isAlive()) {
this.wait(0L);
}
} else {
//join設(shè)置超時练链,則會調(diào)用Object.wait的超時等待
while(this.isAlive()) {
long var7 = var1 - var5;
if (var7 <= 0L) {
break;
}
this.wait(var7);
var5 = System.currentTimeMillis() - var3;
}
}
}
}
/**
* native方法判斷線程存活
*/
public final native boolean isAlive();
Object.wait在下面講述翔脱,isAlive方法底層調(diào)用jvm.cpp中的JVM_IsThreadAlive函數(shù):
JVM_ENTRY(jboolean, JVM_IsThreadAlive(JNIEnv* env, jobject jthread))
JVMWrapper("JVM_IsThreadAlive");
oop thread_oop = JNIHandles::resolve_non_null(jthread);
return java_lang_Thread::is_alive(thread_oop);
JVM_END
五 線程sleep
/**
* 線程休眠
* @param var0 毫秒
* @param var2 納秒
*/
public static void sleep(long var0, int var2) throws InterruptedException {
if (var0 < 0L) {
throw new IllegalArgumentException("timeout value is negative");
} else if (var2 >= 0 && var2 <= 999999) {
//納秒四舍五入
if (var2 >= 500000 || var2 != 0 && var0 == 0L) {
++var0;
}
sleep(var0);
} else {
throw new IllegalArgumentException("nanosecond timeout value out of range");
}
}
/**
* native方法線程休眠
*/
public static native void sleep(long var0) throws InterruptedException;
sleep方法底層調(diào)用jvm.cpp中的JVM_Sleep函數(shù):
JVM_ENTRY(void, JVM_Sleep(JNIEnv* env, jclass threadClass, jlong millis))
JVMWrapper("JVM_Sleep");
if (millis < 0) {
THROW_MSG(vmSymbols::java_lang_IllegalArgumentException(), "timeout value is negative");
}
//線程中斷則拋出異常
if (Thread::is_interrupted (THREAD, true) && !HAS_PENDING_EXCEPTION) {
THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");
}
//保存當(dāng)前線程狀態(tài)并在末尾還原它,并將新線程狀態(tài)設(shè)置為SLEEPING
JavaThreadSleepState jtss(thread);
#ifndef USDT2
HS_DTRACE_PROBE1(hotspot, thread__sleep__begin, millis);
#else /* USDT2 */
HOTSPOT_THREAD_SLEEP_BEGIN(
millis);
#endif /* USDT2 */
EventThreadSleep event;
if (millis == 0) {
//當(dāng)convertsleeptoyield為on時媒鼓,這與JVM_Sleep的經(jīng)典VM實(shí)現(xiàn)相匹配届吁。
//對于類似的線程行為(win32)至關(guān)重要,即在某些GUI上下文中绿鸣,對Solaris進(jìn)行短時間睡眠是有益的疚沐。
if (ConvertSleepToYield) {
os::yield();
} else {
ThreadState old_state = thread->osthread()->get_state();
thread->osthread()->set_state(SLEEPING);
os::sleep(thread, MinSleepInterval, false);
thread->osthread()->set_state(old_state);
}
} else {
ThreadState old_state = thread->osthread()->get_state();
thread->osthread()->set_state(SLEEPING);
if (os::sleep(thread, millis, true) == OS_INTRPT) {
//當(dāng)休眠時,一個異步異常(例如潮模,threaddeathexception)可能拋出了亮蛔,但不需要覆蓋它們。
if (!HAS_PENDING_EXCEPTION) {
if (event.should_commit()) {
event.set_time(millis);
event.commit();
}
#ifndef USDT2
HS_DTRACE_PROBE1(hotspot, thread__sleep__end,1);
#else /* USDT2 */
HOTSPOT_THREAD_SLEEP_END(
1);
#endif /* USDT2 */
//THROW_MSG方法返回擎厢,意味著不能以正確地還原線程狀態(tài)究流,因?yàn)槟呛芸赡苁清e的。
THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");
}
}
thread->osthread()->set_state(old_state);
}
if (event.should_commit()) {
event.set_time(millis);
event.commit();
}
#ifndef USDT2
HS_DTRACE_PROBE1(hotspot, thread__sleep__end,0);
#else /* USDT2 */
HOTSPOT_THREAD_SLEEP_END(
0);
#endif /* USDT2 */
JVM_END
六 線程yield
/**
* native方法線程讓度CPU執(zhí)行權(quán)
*/
public static native void yield();
yield方法底層調(diào)用jvm.cpp中的JVM_Yield函數(shù):
JVM_ENTRY(void, JVM_Yield(JNIEnv *env, jclass threadClass))
JVMWrapper("JVM_Yield");
if (os::dont_yield()) return;
#ifndef USDT2
HS_DTRACE_PROBE0(hotspot, thread__yield);
#else /* USDT2 */
HOTSPOT_THREAD_YIELD();
#endif /* USDT2 */
//當(dāng)ConvertYieldToSleep為off(默認(rèn))時动遭,這與傳統(tǒng)的VM使用yield相匹配,對于類似的線程行為至關(guān)重要
if (ConvertYieldToSleep) {//on
//系統(tǒng)調(diào)用sleep
os::sleep(thread, MinSleepInterval, false);
} else {//off
//系統(tǒng)調(diào)用yield
os::yield();
}
JVM_END
七 線程中斷interrupt
/**
* 線程中斷
*/
public void interrupt() {
Object var1 = this.blockerLock;
synchronized(this.blockerLock) {
Interruptible var2 = this.blocker;
if (var2 != null) {
this.interrupt0();
var2.interrupt(this);
return;
}
}
this.interrupt0();
}
/**
* native方法線程中斷
*/
private native void interrupt0();
interrupt0方法底層調(diào)用jvm.cpp中的JVM_Interrupt函數(shù):
JVM_ENTRY(void, JVM_Interrupt(JNIEnv* env, jobject jthread))
JVMWrapper("JVM_Interrupt");
//確保C++線程和OS線程在操作之前沒有被釋放
oop java_thread = JNIHandles::resolve_non_null(jthread);
MutexLockerEx ml(thread->threadObj() == java_thread ? NULL : Threads_lock);
//我們需要重新解析java_thread,因?yàn)樵讷@取鎖的過程中可能會發(fā)生GC
JavaThread* thr = java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread));
if (thr != NULL) {
Thread::interrupt(thr);
}
JVM_END
七 Object的Wait/Notify/NotifyAll
/**
* 線程等待
* @param var1 毫秒
* @param var3 納秒
*/
public final void wait(long var1, int var3) throws InterruptedException {
if (var1 < 0L) {
throw new IllegalArgumentException("timeout value is negative");
} else if (var3 >= 0 && var3 <= 999999) {
//納秒>0酝静,毫秒直接++
if (var3 > 0) {
++var1;
}
//調(diào)用native方法
this.wait(var1);
} else {
throw new IllegalArgumentException("nanosecond timeout value out of range");
}
}
/**
* native方法線程等待
*/
public final native void wait(long var1) throws InterruptedException;
/**
* native方法線程單個喚醒
*/
public final native void notify();
/**
* native方法線程喚醒等待池中所有線程
*/
public final native void notifyAll();
Wait/Notify/NotifyAll在objectMonitor.cpp中全跨,點(diǎn)擊查看;
Wait片段:
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
Thread * const Self = THREAD ;
assert(Self->is_Java_thread(), "Must be Java thread!");
JavaThread *jt = (JavaThread *)THREAD;
DeferredInitialize () ;
// Throw IMSX or IEX.
CHECK_OWNER();
//調(diào)用is_interrupted()判斷并清除線程中斷狀態(tài)渺杉,如果中斷狀態(tài)為true是越,拋出中斷異常并結(jié)束
if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) {
//post monitor waited event
//注意這是過去式倚评,已經(jīng)等待完了
if (JvmtiExport::should_post_monitor_waited()) {
//注意:這里傳遞參數(shù)'false'天梧,這是因?yàn)橛捎诰€程中斷呢岗,等待不會超時
JvmtiExport::post_monitor_waited(jt, this, false);
}
TEVENT (Wait - Throw IEX) ;
THROW(vmSymbols::java_lang_InterruptedException());
return ;
}
TEVENT (Wait) ;
assert (Self->_Stalled == 0, "invariant") ;
Self->_Stalled = intptr_t(this) ;
jt->set_current_waiting_monitor(this);
// create a node to be put into the queue
// Critically, after we reset() the event but prior to park(), we must check
// for a pending interrupt.
//創(chuàng)建一個node放入隊(duì)列
//關(guān)鍵是悉尾,在reset()之后挫酿,但在park()之前早龟,必須檢查是否有掛起的中斷
ObjectWaiter node(Self);
node.TState = ObjectWaiter::TS_WAIT ;
Self->_ParkEvent->reset() ;
OrderAccess::fence();
//在本例中等待隊(duì)列是一個循環(huán)的雙向鏈表拄衰,但它也可以是一個優(yōu)先級隊(duì)列或任何數(shù)據(jù)結(jié)構(gòu)翘悉。
//_WaitSetLock保護(hù)著等待隊(duì)列.
//通常,等待隊(duì)列只能由監(jiān)視器*except*的所有者訪問轮洋,但在park()因中斷超時而返回的情況下也是可以弊予。
//競爭非常小汉柒,所以使用一個自旋鎖而不是重量級的阻塞鎖碾褂。
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;
AddWaiter (&node) ;
Thread::SpinRelease (&_WaitSetLock) ;
if ((SyncFlags & 4) == 0) {
_Responsible = NULL ;
}
intptr_t save = _recursions; // 記錄舊的遞歸次數(shù)
_waiters++; // waiters 自增
_recursions = 0; // 設(shè)置 recursion level to be 1
exit (Self) ; // 退出監(jiān)視器
guarantee (_owner != Self, "invariant") ;
//一旦在上面的exit()調(diào)用中刪除了ObjectMonitor的所有權(quán)正塌,
//另一個線程就可以進(jìn)入ObjectMonitor乓诽,執(zhí)行notify()和exit()對象監(jiān)視器鸠天。
//如果另一個線程的exit()調(diào)用選擇此線程作為后繼者粮宛,并且此線程在發(fā)布MONITOR_CONTENDED_EXIT時發(fā)生unpark()調(diào)用巍杈,
//則我們使用RawMonitors運(yùn)行事件風(fēng)險處理筷畦,并使用unpark().
//為了避免這個問題鳖宾,我們重新發(fā)布事件鼎文,即使未使用原來的unpark()因俐,
//這也不會造成任何傷害,因?yàn)橐呀?jīng)為此監(jiān)視器選好了繼任者蓉坎。
if (node._notified != 0 && _succ == Self) {
node._event->unpark();
}
// The thread is on the WaitSet list - now park() it.
// On MP systems it's conceivable that a brief spin before we park
// could be profitable.
//
// TODO-FIXME: change the following logic to a loop of the form
// while (!timeout && !interrupted && _notified == 0) park()
int ret = OS_OK ;
int WasNotified = 0 ;
{ // State transition wrappers
OSThread* osthread = Self->osthread();
OSThreadWaitState osts(osthread, true);
{
ThreadBlockInVM tbivm(jt);
// Thread is in thread_blocked state and oop access is unsafe.
//線程處于阻塞狀態(tài)蛉艾,并且oop訪問是不安全的
jt->set_suspend_equivalent();
if (interruptible && (Thread::is_interrupted(THREAD, false) || HAS_PENDING_EXCEPTION)) {
// Intentionally empty 空處理
} else
if (node._notified == 0) {
if (millis <= 0) {
// 調(diào)用park()方法阻塞線程
Self->_ParkEvent->park () ;
} else {
// 調(diào)用park()方法在超時時間內(nèi)阻塞線程
ret = Self->_ParkEvent->park (millis) ;
}
}
// were we externally suspended while we were waiting?
if (ExitSuspendEquivalent (jt)) {
// TODO-FIXME: add -- if succ == Self then succ = null.
jt->java_suspend_self();
}
} // Exit thread safepoint: transition _thread_blocked -> _thread_in_vm
//當(dāng)線程不在等待隊(duì)列時勿侯,使用雙重檢查鎖定避免獲取_WaitSetLock
if (node.TState == ObjectWaiter::TS_WAIT) {
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - unlink") ;
if (node.TState == ObjectWaiter::TS_WAIT) {
DequeueSpecificWaiter (&node) ; // unlink from WaitSet
assert(node._notified == 0, "invariant");
node.TState = ObjectWaiter::TS_RUN ;
}
Thread::SpinRelease (&_WaitSetLock) ;
}
//從這個線程的角度來看罐监,Node's TState是穩(wěn)定的,
//沒有其他線程能夠異步修改TState
guarantee (node.TState != ObjectWaiter::TS_WAIT, "invariant") ;
OrderAccess::loadload() ;
if (_succ == Self) _succ = NULL ;
WasNotified = node._notified ;
// Reentry phase -- reacquire the monitor.
// re-enter contended(競爭) monitor after object.wait().
// retain OBJECT_WAIT state until re-enter successfully completes
// Thread state is thread_in_vm and oop access is again safe,
// although the raw address of the object may have changed.
// (Don't cache naked oops over safepoints, of course).
// post monitor waited event.
//注意這是過去式侧但,已經(jīng)等待完了
if (JvmtiExport::should_post_monitor_waited()) {
JvmtiExport::post_monitor_waited(jt, this, ret == OS_TIMEOUT);
}
OrderAccess::fence() ;
assert (Self->_Stalled != 0, "invariant") ;
Self->_Stalled = 0 ;
assert (_owner != Self, "invariant") ;
ObjectWaiter::TStates v = node.TState ;
if (v == ObjectWaiter::TS_RUN) {
enter (Self) ;
} else {
guarantee (v == ObjectWaiter::TS_ENTER || v == ObjectWaiter::TS_CXQ, "invariant") ;
ReenterI (Self, &node) ;
node.wait_reenter_end(this);
}
// Self has reacquired the lock.
// Lifecycle - the node representing Self must not appear on any queues.
// Node is about to go out-of-scope, but even if it were immortal(長久的) we wouldn't
// want residual(殘留的) elements associated with this thread left on any lists.
guarantee (node.TState == ObjectWaiter::TS_RUN, "invariant") ;
assert (_owner == Self, "invariant") ;
assert (_succ != Self , "invariant") ;
} // OSThreadWaitState()
jt->set_current_waiting_monitor(NULL);
guarantee (_recursions == 0, "invariant") ;
_recursions = save; // restore the old recursion count
_waiters--; // decrement the number of waiters
// Verify a few postconditions
assert (_owner == Self , "invariant") ;
assert (_succ != Self , "invariant") ;
assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ;
if (SyncFlags & 32) {
OrderAccess::fence() ;
}
//檢查是否有通知notify發(fā)生
// 從park()方法返回后,判斷是否是因?yàn)橹袛喾祷匕爻俅握{(diào)用
// thread::is_interrupted(Self, true)判斷并清除線程中斷狀態(tài)
// 如果中斷狀態(tài)為true趾娃,拋出中斷異常并結(jié)束抬闷。
if (!WasNotified) {
// no, it could be timeout or Thread.interrupt() or both
// check for interrupt event, otherwise it is timeout
if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) {
TEVENT (Wait - throw IEX from epilog) ;
THROW(vmSymbols::java_lang_InterruptedException());
}
}
//注意:虛假喚醒將被視為超時笤成;監(jiān)視器通知優(yōu)先于線程中斷炕泳。
}
Notify片段:
void ObjectMonitor::notify(TRAPS) {
CHECK_OWNER();
if (_WaitSet == NULL) {
TEVENT (Empty-Notify) ;
return ;
}
DTRACE_MONITOR_PROBE(notify, this, object(), THREAD);
int Policy = Knob_MoveNotifyee ;
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notify") ;
ObjectWaiter * iterator = DequeueWaiter() ;
if (iterator != NULL) {
TEVENT (Notify1 - Transfer) ;
guarantee (iterator->TState == ObjectWaiter::TS_WAIT, "invariant") ;
guarantee (iterator->_notified == 0, "invariant") ;
if (Policy != 4) {
iterator->TState = ObjectWaiter::TS_ENTER ;
}
iterator->_notified = 1 ;
ObjectWaiter * List = _EntryList ;
if (List != NULL) {
assert (List->_prev == NULL, "invariant") ;
assert (List->TState == ObjectWaiter::TS_ENTER, "invariant") ;
assert (List != iterator, "invariant") ;
}
if (Policy == 0) { // prepend(預(yù)追加) to EntryList
if (List == NULL) {
iterator->_next = iterator->_prev = NULL ;
_EntryList = iterator ;
} else {
List->_prev = iterator ;
iterator->_next = List ;
iterator->_prev = NULL ;
_EntryList = iterator ;
}
} else
if (Policy == 1) { // append(真正追加) to EntryList
if (List == NULL) {
iterator->_next = iterator->_prev = NULL ;
_EntryList = iterator ;
} else {
//考慮:當(dāng)前獲取EntryList的tail需要遍歷整個鏈表
//將tail訪問轉(zhuǎn)換為CDLL而不是使用當(dāng)前的DLL培遵,從而使訪問時間固定荤懂。
ObjectWaiter * Tail ;
for (Tail = List ; Tail->_next != NULL ; Tail = Tail->_next) ;
assert (Tail != NULL && Tail->_next == NULL, "invariant") ;
Tail->_next = iterator ;
iterator->_prev = Tail ;
iterator->_next = NULL ;
}
} else
if (Policy == 2) { // prepend to cxq
// prepend(預(yù)追加) to cxq
if (List == NULL) {
iterator->_next = iterator->_prev = NULL ;
_EntryList = iterator ;
} else {
iterator->TState = ObjectWaiter::TS_CXQ ;
for (;;) {
ObjectWaiter * Front = _cxq ;
iterator->_next = Front ;
if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {
break ;
}
}
}
} else
if (Policy == 3) { // append(真正追加) to cxq
iterator->TState = ObjectWaiter::TS_CXQ ;
for (;;) {
ObjectWaiter * Tail ;
Tail = _cxq ;
if (Tail == NULL) {
iterator->_next = NULL ;
if (Atomic::cmpxchg_ptr (iterator, &_cxq, NULL) == NULL) {
break ;
}
} else {
while (Tail->_next != NULL) Tail = Tail->_next ;
Tail->_next = iterator ;
iterator->_prev = Tail ;
iterator->_next = NULL ;
break ;
}
}
} else {
ParkEvent * ev = iterator->_event ;
iterator->TState = ObjectWaiter::TS_RUN ;
OrderAccess::fence() ;
ev->unpark() ;
}
if (Policy < 4) {
iterator->wait_reenter_begin(this);
}
// _WaitSetLock protects the wait queue, not the EntryList. We could
// move the add-to-EntryList operation, above, outside the critical section
// protected by _WaitSetLock. In practice that's not useful. With the
// exception of wait() timeouts and interrupts the monitor owner
// is the only thread that grabs _WaitSetLock. There's almost no contention
// on _WaitSetLock so it's not profitable to reduce the length of the
// critical section.
}
Thread::SpinRelease (&_WaitSetLock) ;
if (iterator != NULL && ObjectMonitor::_sync_Notifications != NULL) {
ObjectMonitor::_sync_Notifications->inc() ;
}
}
NotifyAll片段:
void ObjectMonitor::notifyAll(TRAPS) {
CHECK_OWNER();
ObjectWaiter* iterator;
if (_WaitSet == NULL) {
TEVENT (Empty-NotifyAll) ;
return ;
}
DTRACE_MONITOR_PROBE(notifyAll, this, object(), THREAD);
int Policy = Knob_MoveNotifyee ;
int Tally = 0 ;
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notifyall") ;
for (;;) {
iterator = DequeueWaiter () ;
if (iterator == NULL) break ;
TEVENT (NotifyAll - Transfer1) ;
++Tally ;
// Disposition - what might we do with iterator ?
// a. add it directly to the EntryList - either tail or head.
// b. push it onto the front of the _cxq.
// For now we use (a).
guarantee (iterator->TState == ObjectWaiter::TS_WAIT, "invariant") ;
guarantee (iterator->_notified == 0, "invariant") ;
iterator->_notified = 1 ;
if (Policy != 4) {
iterator->TState = ObjectWaiter::TS_ENTER ;
}
ObjectWaiter * List = _EntryList ;
if (List != NULL) {
assert (List->_prev == NULL, "invariant") ;
assert (List->TState == ObjectWaiter::TS_ENTER, "invariant") ;
assert (List != iterator, "invariant") ;
}
if (Policy == 0) { // prepend to EntryList
if (List == NULL) {
iterator->_next = iterator->_prev = NULL ;
_EntryList = iterator ;
} else {
List->_prev = iterator ;
iterator->_next = List ;
iterator->_prev = NULL ;
_EntryList = iterator ;
}
} else
if (Policy == 1) { // append to EntryList
if (List == NULL) {
iterator->_next = iterator->_prev = NULL ;
_EntryList = iterator ;
} else {
// CONSIDER: finding the tail currently requires a linear-time walk of
// the EntryList. We can make tail access constant-time by converting to
// a CDLL instead of using our current DLL.
ObjectWaiter * Tail ;
for (Tail = List ; Tail->_next != NULL ; Tail = Tail->_next) ;
assert (Tail != NULL && Tail->_next == NULL, "invariant") ;
Tail->_next = iterator ;
iterator->_prev = Tail ;
iterator->_next = NULL ;
}
} else
if (Policy == 2) { // prepend to cxq
// prepend to cxq
iterator->TState = ObjectWaiter::TS_CXQ ;
for (;;) {
ObjectWaiter * Front = _cxq ;
iterator->_next = Front ;
if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {
break ;
}
}
} else
if (Policy == 3) { // append to cxq
iterator->TState = ObjectWaiter::TS_CXQ ;
for (;;) {
ObjectWaiter * Tail ;
Tail = _cxq ;
if (Tail == NULL) {
iterator->_next = NULL ;
if (Atomic::cmpxchg_ptr (iterator, &_cxq, NULL) == NULL) {
break ;
}
} else {
while (Tail->_next != NULL) Tail = Tail->_next ;
Tail->_next = iterator ;
iterator->_prev = Tail ;
iterator->_next = NULL ;
break ;
}
}
} else {
ParkEvent * ev = iterator->_event ;
iterator->TState = ObjectWaiter::TS_RUN ;
OrderAccess::fence() ;
ev->unpark() ;
}
if (Policy < 4) {
iterator->wait_reenter_begin(this);
}
// _WaitSetLock protects the wait queue, not the EntryList. We could
// move the add-to-EntryList operation, above, outside the critical section
// protected by _WaitSetLock. In practice that's not useful. With the
// exception of wait() timeouts and interrupts the monitor owner
// is the only thread that grabs _WaitSetLock. There's almost no contention
// on _WaitSetLock so it's not profitable to reduce the length of the
// critical section.
}
Thread::SpinRelease (&_WaitSetLock) ;
if (Tally != 0 && ObjectMonitor::_sync_Notifications != NULL) {
ObjectMonitor::_sync_Notifications->inc(Tally) ;
}
}
特此聲明:
分享文章有完整的知識架構(gòu)圖矾瘾,將從以下幾個方面系統(tǒng)展開:
1 基礎(chǔ)(Linux/Spring boot/并發(fā))
2 性能調(diào)優(yōu)(jvm/tomcat/mysql)
3 高并發(fā)分布式
4 微服務(wù)體系
如果您覺得文章不錯壕翩,請關(guān)注阿倫故事放妈,您的支持是我堅(jiān)持的莫大動力芜抒,在此受小弟一拜宅倒!
每篇福利: