概述
Java開發(fā)中蒲犬,會經(jīng)常使用到多線程橡类,有必要深入了解其實(shí)現(xiàn)原理碗脊;
創(chuàng)建Thread
java.lang.Thread主要的成員變量如下:
private char name[];//線程名稱
private int priority;//優(yōu)先級
private volatile int threadStatus = 0;//線程狀態(tài)
private boolean daemon = false;//是否后臺線程
private Runnable target;//線程執(zhí)行的邏輯
//每個(gè)線程都有一個(gè)ThreadLocalMap的成員變量肋层,類似hashmap
//有興趣深入了解的可以閱讀文章《ThreadLocal源碼閱讀》
ThreadLocal.ThreadLocalMap threadLocals = null;
private long eetop;//實(shí)際上是個(gè)指針奴艾,指向JavaThread的地址
創(chuàng)建Thread對象時(shí)净当,實(shí)際上調(diào)用的是init方法,方法邏輯比較簡單蕴潦,這里就不詳細(xì)介紹了像啼。
start
我們都知道啟動(dòng)線程要調(diào)用start方法,那么start方法里面都做了些什么呢潭苞?
public synchronized void start() {
if (threadStatus != 0)
throw new IllegalThreadStateException();
group.add(this);
boolean started = false;
try {
start0();
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
}
}
}
private native void start0();
可以看到主要的邏輯都是通過native方法star0實(shí)現(xiàn)的忽冻, 查看Thread.c文件可以知道,它實(shí)際上調(diào)用的是jvm.cpp文件的JVM_StartThread方法:
JVM_ENTRY(void, JVM_StartThread(JNIEnv* env, jobject jthread))
JVMWrapper("JVM_StartThread");
JavaThread *native_thread = NULL;
bool throw_illegal_thread_state = false;
{
// Threads_lock代表在活動(dòng)線程表上的鎖此疹,MutexLocker會調(diào)用lock方法上鎖
MutexLocker mu(Threads_lock);
//實(shí)際上是判斷java.lang.Thread的eetop,正常情況下僧诚,在后續(xù)步驟中會賦值遮婶,但在此處為0,不指向任何對象;
//其實(shí)在start方法中已經(jīng)根據(jù)threadStatus進(jìn)行了判斷湖笨,但是由于創(chuàng)建線程對象和更新threadStatus并不是原子操作旗扑,因而再次check
if (java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread)) != NULL) {
throw_illegal_thread_state = true;//狀態(tài)錯(cuò)誤,返回
} else {
//在java.lang.Thread的init方法中赶么,可設(shè)置stack的大小肩豁,此處獲取設(shè)置的大屑勾辫呻;
//不過通常調(diào)用構(gòu)造函數(shù)的時(shí)候都不會傳入stack大小,size=0
jlong size =
java_lang_Thread::stackSize(JNIHandles::resolve_non_null(jthread));
size_t sz = size > 0 ? (size_t) size : 0;
//在下面的[創(chuàng)建JavaThread]介紹
native_thread = new JavaThread(&thread_entry, sz);
if (native_thread->osthread() != NULL) {
native_thread->prepare(jthread);
}
}
}
if (throw_illegal_thread_state) {
THROW(vmSymbols::java_lang_IllegalThreadStateException());
}
assert(native_thread != NULL, "Starting null thread?");
//Java線程實(shí)際上是通過系統(tǒng)線程實(shí)現(xiàn)的琼锋,如果創(chuàng)建系統(tǒng)線程失敗放闺,報(bào)錯(cuò);
//有很多原因會導(dǎo)致該錯(cuò)誤:比如內(nèi)存不足缕坎、max user processes設(shè)置過小
if (native_thread->osthread() == NULL) {
delete native_thread;
if (JvmtiExport::should_post_resource_exhausted()) {
JvmtiExport::post_resource_exhausted(
JVMTI_RESOURCE_EXHAUSTED_OOM_ERROR | JVMTI_RESOURCE_EXHAUSTED_THREADS,
"unable to create new native thread");
}
THROW_MSG(vmSymbols::java_lang_OutOfMemoryError(),
"unable to create new native thread");
}
//在下面的[創(chuàng)建JavaThread]介紹
Thread::start(native_thread);
JVM_END
創(chuàng)建JavaThread
JavaThread::JavaThread(ThreadFunction entry_point, size_t stack_sz) :
Thread()
#ifndef SERIALGC
, _satb_mark_queue(&_satb_mark_queue_set),
_dirty_card_queue(&_dirty_card_queue_set)
#endif // !SERIALGC
{
if (TraceThreadEvents) {
tty->print_cr("creating thread %p", this);
}
initialize();
_jni_attach_state = _not_attaching_via_jni;
set_entry_point(entry_point);
os::ThreadType thr_type = os::java_thread;
//根據(jù)entry_point判斷是CompilerThread還是JavaThread
//由于此處傳入的為&thread_entry,因此為os::java_thread
thr_type = entry_point == &compiler_thread_entry ? os::compiler_thread :
os::java_thread;
// os線程有可能創(chuàng)建失敗怖侦,在上文已經(jīng)看到對該場景的處理
os::create_thread(this, thr_type, stack_sz);
_safepoint_visible = false;
}
可以看到JavaThread繼承自Thread,而Thread重載了operator new:
public:
void* operator new(size_t size) { return allocate(size, true); }
void* operator new(size_t size, std::nothrow_t& nothrow_constant) { return allocate(size, false); }
void operator delete(void* p);
protected:
static void* allocate(size_t size, bool throw_excpt, MEMFLAGS flags = mtThread);
關(guān)于operator new描述如下:
operator new可以做為常規(guī)函數(shù)被調(diào)用;在C ++中,new是一個(gè)具有特定行為的操作符:它首先調(diào)用operator new函數(shù),用其類型說明符的大小作為第一個(gè)參數(shù)谜叹,如果調(diào)用成功匾寝,則自動(dòng)初始化或構(gòu)造對象。
allocate定義如下:
void* Thread::allocate(size_t size, bool throw_excpt, MEMFLAGS flags) {
if (UseBiasedLocking) {
//alignment=2<<10,對于偏向鎖荷腊,地址要對齊艳悔,即低10位為0
//根據(jù)偏向鎖的實(shí)現(xiàn),要求線程指向地址的低10位為0女仰,那么該如何實(shí)現(xiàn)呢猜年?
//可以看到此處在申請內(nèi)存的時(shí)候,對原申請大小做了調(diào)整疾忍;
//假設(shè)申請到到地址為0xA11CA(低10位為0111001010),則0xA2000是滿足條件的地址乔外,這兩地址間相差0x0E36(小于1<<10),
//也就是說原申請內(nèi)存大小+alignment,則可以指針后移一罩,找到符合條件的地址;
//那為什么要減去sizeof(intptr_t)?因?yàn)橹羔樅笠谱疃酁閍lignment-1杨幼,即可找到滿足條件的地址;
//而第一位存儲的是_real_malloc_address,占用內(nèi)存空間為sizeof(intptr_t)
//sizeof(intptr_t)是為了跨平臺定義的類型,在64位平臺下為8bytes,32位平臺為4bytes;
const int alignment = markOopDesc::biased_lock_alignment;
size_t aligned_size = size + (alignment - sizeof(intptr_t));
//throw_excpt傳入為true
void* real_malloc_addr = throw_excpt? AllocateHeap(aligned_size, flags, CURRENT_PC)
: os::malloc(aligned_size, flags, CURRENT_PC);
void* aligned_addr = (void*) align_size_up((intptr_t) real_malloc_addr, alignment);
assert(((uintptr_t) aligned_addr + (uintptr_t) size) <=
((uintptr_t) real_malloc_addr + (uintptr_t) aligned_size),
"JavaThread alignment code overflowed allocated storage");
if (TraceBiasedLocking) {
if (aligned_addr != real_malloc_addr)
tty->print_cr("Aligned thread " INTPTR_FORMAT " to " INTPTR_FORMAT,
real_malloc_addr, aligned_addr);
}
((Thread*) aligned_addr)->_real_malloc_address = real_malloc_addr;
return aligned_addr;
} else {
return throw_excpt? AllocateHeap(size, flags, CURRENT_PC)
: os::malloc(size, flags, CURRENT_PC);
}
}
//alignment-1聂渊,再取反推汽,即變成0xfffffffffffff800(低10位為0),該方法的效果相當(dāng)于將低10位變?yōu)?
#define align_size_up_(size, alignment) (((size) + ((alignment) - 1)) & ~((alignment) - 1))
inline intptr_t align_size_up(intptr_t size, intptr_t alignment) {
return align_size_up_(size, alignment);
}
initialize主要是做各種初始化,這邊就不詳細(xì)介紹了歧沪;
JavaThred中有幾個(gè)成員變量比較重要:
//用于synchronized同步塊和Object.wait()
ParkEvent * _ParkEvent ;
//用于Thread.sleep()
ParkEvent * _SleepEvent ;
//用于unsafe.park()/unpark(),供java.util.concurrent.locks.LockSupport調(diào)用歹撒,
//因此它支持了java.util.concurrent的各種鎖、條件變量等線程同步操作,是concurrent的實(shí)現(xiàn)基礎(chǔ)
Parker* _parker;
os::create_thread方法邏輯如下:
bool os::create_thread(Thread* thread, ThreadType thr_type, size_t stack_size) {
assert(thread->osthread() == NULL, "caller responsible");
// 創(chuàng)建OSThread
OSThread* osthread = new OSThread(NULL, NULL);
if (osthread == NULL) {
return false;
}
//設(shè)置線程類型
osthread->set_thread_type(thr_type);
// 初始化狀態(tài)為ALLOCATED
osthread->set_state(ALLOCATED);
thread->set_osthread(osthread);
//linux下可以通過pthread_attr_t來設(shè)置線程屬性
pthread_attr_t attr;
pthread_attr_init(&attr);//linux系統(tǒng)調(diào)用诊胞,更多內(nèi)容請參考內(nèi)核文檔
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
// 線程棧大小
if (os::Linux::supports_variable_stack_size()) {//是否支持設(shè)置棧大小
//如果用戶創(chuàng)建線程時(shí)未指定棧大小,對于JavaThread會看是否設(shè)置了-Xss或ThreadStackSize暖夭;
//如果未設(shè)置锹杈,則采用系統(tǒng)默認(rèn)值。對于64位操作系統(tǒng)迈着,默認(rèn)為1M;
//操作系統(tǒng)棧大薪咄(ulimit -s):這個(gè)配置只影響進(jìn)程的初始線程;后續(xù)用pthread_create創(chuàng)建的線程都可以指定棧大小裕菠。
//HotSpot VM為了能精確控制Java線程的棧大小咬清,特意不使用進(jìn)程的初始線程(primordial thread)作為Java線程
if (stack_size == 0) {
stack_size = os::Linux::default_stack_size(thr_type);
switch (thr_type) {
case os::java_thread:
//讀取Xss和ThreadStackSize
assert (JavaThread::stack_size_at_create() > 0, "this should be set");
stack_size = JavaThread::stack_size_at_create();
break;
case os::compiler_thread:
if (CompilerThreadStackSize > 0) {
stack_size = (size_t)(CompilerThreadStackSize * K);
break;
} // else fall through:
// use VMThreadStackSize if CompilerThreadStackSize is not defined
case os::vm_thread:
case os::pgc_thread:
case os::cgc_thread:
case os::watcher_thread:
if (VMThreadStackSize > 0) stack_size = (size_t)(VMThreadStackSize * K);
break;
}
}
//棧最小為48k
stack_size = MAX2(stack_size, os::Linux::min_stack_allowed);
pthread_attr_setstacksize(&attr, stack_size);
} else {
// let pthread_create() pick the default value.
}
pthread_attr_setguardsize(&attr, os::Linux::default_guard_size(thr_type));
ThreadState state;
{
//如果linux線程而且不支持設(shè)置棧大小,則先獲取創(chuàng)建線程鎖奴潘,獲取鎖之后再創(chuàng)建線程
bool lock = os::Linux::is_LinuxThreads() && !os::Linux::is_floating_stack();
if (lock) {
os::Linux::createThread_lock()->lock_without_safepoint_check();
}
pthread_t tid;
//調(diào)用linux的pthread_create創(chuàng)建線程,傳入4個(gè)參數(shù)
//第一個(gè)參數(shù):指向線程標(biāo)示符pthread_t的指針旧烧;
//第二個(gè)參數(shù):設(shè)置線程的屬性
//第三個(gè)參數(shù):線程運(yùn)行函數(shù)的起始地址
//第四個(gè)參數(shù):運(yùn)行函數(shù)的參數(shù)
int ret = pthread_create(&tid, &attr, (void* (*)(void*)) java_start, thread);
pthread_attr_destroy(&attr);
if (ret != 0) {//創(chuàng)建失敗,做清理工作
if (PrintMiscellaneous && (Verbose || WizardMode)) {
perror("pthread_create()");
}
// Need to clean up stuff we've allocated so far
thread->set_osthread(NULL);
delete osthread;
if (lock) os::Linux::createThread_lock()->unlock();
return false;
}
// 將pthread id保存到osthread
osthread->set_pthread_id(tid);
// 等待pthread_create創(chuàng)建的子線程完成初始化或放棄
{
Monitor* sync_with_child = osthread->startThread_lock();
MutexLockerEx ml(sync_with_child, Mutex::_no_safepoint_check_flag);
while ((state = osthread->get_state()) == ALLOCATED) {
sync_with_child->wait(Mutex::_no_safepoint_check_flag);
}
}
if (lock) {
os::Linux::createThread_lock()->unlock();
}
}
// Aborted due to thread limit being reached
if (state == ZOMBIE) {
thread->set_osthread(NULL);
delete osthread;
return false;
}
// The thread is returned suspended (in state INITIALIZED),
// and is started higher up in the call chain
assert(state == INITIALIZED, "race condition");
return true;
}
創(chuàng)建線程時(shí)傳入了java_start,做為線程運(yùn)行函數(shù)的初始地址:
static void *java_start(Thread *thread) {
static int counter = 0;
int pid = os::current_process_id();
//alloca是用來分配存儲空間的,它和malloc的區(qū)別是它是在當(dāng)前函數(shù)的棧上分配存儲空間画髓,而不是在堆中掘剪。
//其優(yōu)點(diǎn)是:當(dāng)函數(shù)返回時(shí),自動(dòng)釋放它所使用的棧奈虾。
alloca(((pid ^ counter++) & 7) * 128);
ThreadLocalStorage::set_thread(thread);
OSThread* osthread = thread->osthread();
Monitor* sync = osthread->startThread_lock();
// non floating stack LinuxThreads needs extra check, see above
if (!_thread_safety_check(thread)) {
// notify parent thread
MutexLockerEx ml(sync, Mutex::_no_safepoint_check_flag);
osthread->set_state(ZOMBIE);
sync->notify_all();
return NULL;
}
// thread_id is kernel thread id (similar to Solaris LWP id)
osthread->set_thread_id(os::Linux::gettid());
//優(yōu)先嘗試在請求線程當(dāng)前所處的CPU的Local內(nèi)存上分配空間夺谁。
//如果local內(nèi)存不足,優(yōu)先淘汰local內(nèi)存中無用的Page
if (UseNUMA) {//默認(rèn)為false
int lgrp_id = os::numa_get_group_id();
if (lgrp_id != -1) {
thread->set_lgrp_id(lgrp_id);
}
}
// 調(diào)用pthread_sigmask初始化signal mask:VM線程處理BREAK_SIGNAL信號
os::Linux::hotspot_sigmask(thread);
// initialize floating point control register
os::Linux::init_thread_fpu_state();
// handshaking with parent thread
{
MutexLockerEx ml(sync, Mutex::_no_safepoint_check_flag);
// 設(shè)置狀態(tài)會INITIALIZED,并通過notify_all喚醒父線程
osthread->set_state(INITIALIZED);
sync->notify_all();
// 一直等待父線程調(diào)用 os::start_thread()
while (osthread->get_state() == INITIALIZED) {
sync->wait(Mutex::_no_safepoint_check_flag);
}
}
// call one more level start routine
thread->run();
return 0;
}
當(dāng)子線程完成初始化肉微,將狀態(tài)設(shè)置為NITIALIZED并喚醒父線程之后匾鸥,父線程會執(zhí)行Thread::start方法:
void Thread::start(Thread* thread) {
trace("start", thread);
if (!DisableStartThread) {
if (thread->is_Java_thread()) {//設(shè)置線程狀態(tài)為RUNNABLE
java_lang_Thread::set_thread_status(((JavaThread*)thread)->threadObj(),
java_lang_Thread::RUNNABLE);
}
os::start_thread(thread);
}
}
void os::start_thread(Thread* thread) {
MutexLockerEx ml(thread->SR_lock(), Mutex::_no_safepoint_check_flag);
OSThread* osthread = thread->osthread();
//設(shè)置線程狀態(tài)為RUNNABLE, 子線程可以開始執(zhí)行thread->run()
osthread->set_state(RUNNABLE);
pd_start_thread(thread);
}
void os::pd_start_thread(Thread* thread) {
OSThread * osthread = thread->osthread();
assert(osthread->get_state() != INITIALIZED, "just checking");
Monitor* sync_with_child = osthread->startThread_lock();
MutexLockerEx ml(sync_with_child, Mutex::_no_safepoint_check_flag);
sync_with_child->notify();//父線程會調(diào)用thread->run();
}
父線程的thread->run主要邏輯為調(diào)用thread_main_inner,源碼如下:
void JavaThread::thread_main_inner() {//刪除部分非關(guān)鍵代碼
if (!this->has_pending_exception() &&
!java_lang_Thread::is_stillborn(this->threadObj())) {
{
ResourceMark rm(this);
this->set_native_thread_name(this->get_thread_name());
}
HandleMark hm(this);
this->entry_point()(this, this);
}
this->exit(false);
delete this;
}
那這兒的entry_point是什么呢?實(shí)際上在創(chuàng)建JavaThread時(shí)碉纳,會傳入entrypoint:
static void thread_entry(JavaThread* thread, TRAPS) {
HandleMark hm(THREAD);
Handle obj(THREAD, thread->threadObj());
JavaValue result(T_VOID);
JavaCalls::call_virtual(&result,
obj,
KlassHandle(THREAD, SystemDictionary::Thread_klass()),
vmSymbols::run_method_name(),
vmSymbols::void_method_signature(),
THREAD);
}
可以看到實(shí)際上就是調(diào)用java.lang.Thread的run方法;
另外當(dāng)run方法執(zhí)行結(jié)束,會調(diào)用JavaThread::exit方法清理資源