??????在以前可以通過thread.stop()可以讓一個線程去停止另一個線程,但這種方法太暴力或渤,突然間停止其他線程會導(dǎo)致被停止的線程無法完成一些清理工作系冗,所以stop()已經(jīng)被拋棄了。
Java線程的終止操作最初是直接暴露給用戶的薪鹦,java.lang.Thread類提供了stop()方法掌敬,允許用戶暴力的終止一個線程并退出臨界區(qū)(釋放所有鎖,并在當(dāng)前調(diào)用棧拋出ThreadDeath Exception)池磁。 同樣的奔害,Thread.suspend()和Thread.resume()方法允許用戶靈活的暫停和恢復(fù)線程。然而這些看似簡便的API在JDK1.2就被deprecate掉了地熄,原因是stop()方法本質(zhì)上是不安全的华临,它會強制釋放掉線程持有的鎖,這樣臨界區(qū)的數(shù)據(jù)中間狀態(tài)就會遺留出來端考,從而造成不可預(yù)知的后果雅潭。
當(dāng)然Java線程不可能沒有辦法終止,在Java程序中却特,唯一的也是最好的辦法就是讓線程從run()方法返回扶供。更具體來說,有以下幾種情況:
- 對于runnable的線程裂明,利用一個變量做標(biāo)記位椿浓,定期檢查
private volatile boolean flag = true;
public void stop() {
flag = false;
}
public void run() {
while (flag) {
//do something...
}
}
- 對于非runnable的線程,應(yīng)該采取中斷的方式退出阻塞,并處理捕獲的中斷異常
- 對于大部分阻塞線程的方法扳碍,使用Thread.interrupt()提岔,可以立刻退出等待,拋出InterruptedException. 這些方法包括Object.wait(), Thread.join()左腔,Thread.sleep()唧垦,以及各種AQS衍生類:Lock.lockInterruptibly()等任何顯示聲明throws InterruptedException的方法。
private volatile Thread thread;
public void stop() {
thread.interrupt();
}
public void run() {
thread = Thread.currentThread();
while (flag) {
try {
Thread.sleep(interval);
} catch (InterruptedException e){
//current thread was interrupted
return;
}
}
}
- 被阻塞的nio Channel也會響應(yīng)interrupt()液样,拋出ClosedByInterruptException振亮,相應(yīng)nio通道需要實現(xiàn)java.nio.channels.InterruptibleChannel接口
private volatile Thread thread;
public void stop() {
thread.interrupt();
}
public void run() {
thread = Thread.currentThread();
BufferedReader in = new BufferedReader(new InputStreamReader(Channels.newInputStream(
new FileInputStream(FileDescriptor.in).getChannel())));
while (flag) {
try {
String line = null;
while ((line = in.readLine()) != null) {
System.out.println("Read line:'"+line+"'");
}
} catch (ClosedByInterruptException e) {
//current channel was interrupted
return;
} catch (IOException e) {
e.printStackTrace();
}
}
}
如果使用的是傳統(tǒng)IO(非Channel,如ServerSocket.accept)鞭莽,所在線程被interrupt時不會拋出ClosedByInterruptException坊秸。但可以使用流的close方法實現(xiàn)退出阻塞。
- 還有一些阻塞方法不會響應(yīng)interrupt澎怒,如等待進入synchronized段褒搔、Lock.lock()。他們不能被動的退出阻塞狀態(tài)喷面。
而Thread.interrupt的作用其實不是中斷線程星瘾,而是通知線程應(yīng)該中斷了,給這個線程發(fā)一個信號惧辈,告訴它琳状,它應(yīng)該結(jié)束了,設(shè)置一個停止標(biāo)志盒齿, 具體到底中斷還是運行念逞,應(yīng)該由被通知的線程自己處理。具體來說,對一個線程边翁,調(diào)用interrupt()時:
- 如果一個線程處于了阻塞狀態(tài)(如線程調(diào)用了thread.sleep翎承、thread.join、thread.wait符匾、1.5中的condition.await叨咖、以及可中斷的通道上的 I/O 操作方法后可進入阻塞狀態(tài)),則在線程在檢查中斷標(biāo)示時如果發(fā)現(xiàn)中斷標(biāo)示為true啊胶,則會在這些阻塞方法(sleep芒澜、join、wait创淡、1.5中的condition.await及可中斷的通道上的 I/O 操作方法)調(diào)用處拋出InterruptedException異常痴晦,
- 如果線程處于正常活動狀態(tài)琳彩,那么會將該線程中的中斷標(biāo)志設(shè)置為true誊酌,僅此而已部凑, 被設(shè)置中斷標(biāo)志的線程將繼續(xù)正常執(zhí)行,不受影響碧浊。
interrupt()并不能真正中斷線程涂邀,需要被調(diào)用的線程自己進行配合才行。一個線程如果有被中斷的需求箱锐,那么就可以這樣做: - 在正常運行任務(wù)時比勉,經(jīng)常檢查本線程的中斷標(biāo)志位,如果被設(shè)置了中斷標(biāo)志驹止,就自行停止線程
- 在調(diào)用阻塞方法時正常處理InterruptException異常浩聋,
hread thread = new Thread(() -> {
while (!Thread.interrupted()) {
// do more work.
}
});
thread.start();
// 一段時間以后
thread.interrupt();
上面代碼中thread.interrupted()清除標(biāo)志位是為了下次繼續(xù)監(jiān)測標(biāo)志位,不然會對被中斷線程的下次運行有影響臊恋。通過interrupt()和.interrupted()方法兩者的配合可以實現(xiàn)正常去停止一個線程衣洁,線程A通過調(diào)用線程B的interrupt方法通知線程B讓它結(jié)束線程,在線程B的run方法內(nèi)部抖仅,通過循環(huán)檢查.interrupted()方法是否為真來接收線程A的信號坊夫,如果為真就可以拋出一個異常,在catch中完成一些清理工作撤卢,然后結(jié)束線程环凿。Thread.interrupted()會清除標(biāo)志位,并不是代表線程又恢復(fù)了放吩,可以理解為僅僅是代表它已經(jīng)響應(yīng)完了這個中斷信號然后又重新置為可以再次接收信號的狀態(tài)智听。從始至終,理解一個關(guān)鍵點屎慢,interrupt()方法僅僅是改變一個標(biāo)志位的值而已,和線程的狀態(tài)并沒有必然的聯(lián)系忽洛。
源碼分析
Thread.interrupt()方法設(shè)計的目的腻惠,是提示一個線程應(yīng)該終止,但不強制該線程終止欲虚。程序員可以來決定如何響應(yīng)這個終止提示集灌。直接上源碼:
//Class java.lang.Thread
public void interrupt() {
if (this != Thread.currentThread())
checkAccess();
synchronized (blockerLock) {
Interruptible b = blocker; //中斷觸發(fā)器
if (b != null) {
interrupt0();
b.interrupt(this); //觸發(fā)回調(diào)接口
return;
}
}
interrupt0();
}
校驗權(quán)限
如果不是當(dāng)前線程自我中斷,會先做一次權(quán)限檢查复哆。如果被中斷的線程屬于系統(tǒng)線程組(即JVM線程)欣喧,checkAccess()方法會使用系統(tǒng)的System.getSecurityManager()來判斷權(quán)限。由于Java默認沒有開啟安全策略梯找,此方法其實會跳過安全檢查唆阿。
觸發(fā)中斷回調(diào)接口
如果線程的中斷觸發(fā)器blocker不為null,則觸發(fā)中斷觸發(fā)回調(diào)接口Interruptible锈锤。那么這個觸發(fā)器blocker是什么時候被設(shè)置的呢驯鳖?
上面提到闲询,如果一個nio通道實現(xiàn)了InterruptibleChannel接口,就可以響應(yīng)interrupt()中斷浅辙,其原理就在InterruptibleChannel接口的抽象實現(xiàn)類AbstractInterruptibleChannel的方法begin()中:
//Class java.nio.channels.spi.AbstractInterruptibleChannel
protected final void begin() {
if (interruptor == null) {
interruptor = new Interruptible() {
public void interrupt(Thread target) {
synchronized (closeLock) {
if (!open)
return;
open = false;
interrupted = target;
try {//關(guān)閉io通道
AbstractInterruptibleChannel.this.implCloseChannel();
} catch (IOException x) { }
}
}};
}
blockedOn(interruptor);//將interruptor設(shè)置為當(dāng)前線程的blocker
Thread me = Thread.currentThread();
if (me.isInterrupted())
interruptor.interrupt(me);
}
protected final void end(boolean completed) throws AsynchronousCloseException {
blockedOn(null);
Thread interrupted = this.interrupted;
if (interrupted != null && interrupted == Thread.currentThread()) {
interrupted = null;
throw new ClosedByInterruptException();
}
if (!completed && !open)
throw new AsynchronousCloseException();
}
//Class java.nio.channels.Channels.ReadableByteChannelImpl
public int read(ByteBuffer dst) throws IOException {
......
try {
begin();
bytesRead = in.read(buf, 0, bytesToRead);
finally {
end(bytesRead > 0);
}
......
}
以上述代碼為例扭弧,nio通道ReadableByteChannel每次執(zhí)行阻塞方法read()前,都會執(zhí)行begin()记舆,把Interruptible回調(diào)接口注冊到當(dāng)前線程上鸽捻,以實現(xiàn)能夠響應(yīng)其他線程的中斷。當(dāng)線程收到中斷時泽腮,Thread.interrupt()觸發(fā)回調(diào)接口御蒲,在回調(diào)接口Interruptible中關(guān)閉io通道并返回,最后在finally塊中執(zhí)行end()盛正,end()方法會檢查中斷標(biāo)記删咱,拋出ClosedByInterruptException。
interrupt0()
無論是否設(shè)置了中斷觸發(fā)回調(diào)blocker豪筝,都會執(zhí)行這個關(guān)鍵的native方法interrupt0():
private native void interrupt0();
以openJDK7的Hotspot虛擬機為例痰滋,先找到native方法映射
//jdk\src\share\native\java\lang\Thread.c
static JNINativeMethod methods[] = {
{"start0", "()V", (void *)&JVM_StartThread},
{"stop0", "(" OBJ ")V", (void *)&JVM_StopThread},
{"isAlive", "()Z", (void *)&JVM_IsThreadAlive},
{"suspend0", "()V", (void *)&JVM_SuspendThread},
{"resume0", "()V", (void *)&JVM_ResumeThread},
{"setPriority0", "(I)V", (void *)&JVM_SetThreadPriority},
{"yield", "()V", (void *)&JVM_Yield},
{"sleep", "(J)V", (void *)&JVM_Sleep},
{"currentThread", "()" THD, (void *)&JVM_CurrentThread},
{"countStackFrames", "()I", (void *)&JVM_CountStackFrames},
{"interrupt0", "()V", (void *)&JVM_Interrupt},
{"isInterrupted", "(Z)Z", (void *)&JVM_IsInterrupted},
{"holdsLock", "(" OBJ ")Z", (void *)&JVM_HoldsLock},
{"getThreads", "()[" THD, (void *)&JVM_GetAllThreads},
{"dumpThreads", "([" THD ")[[" STE, (void *)&JVM_DumpThreads},
};
可以找到interrupt0對應(yīng)JVM_Interrupt這個函數(shù),繼續(xù)找到實現(xiàn)代碼
//hotspot\src\share\vm\prims\jvm.cpp
JVM_ENTRY(void, JVM_Interrupt(JNIEnv* env, jobject jthread))
JVMWrapper("JVM_Interrupt");
// Ensure that the C++ Thread and OSThread structures aren't freed before we operate
oop java_thread = JNIHandles::resolve_non_null(jthread);
MutexLockerEx ml(thread->threadObj() == java_thread ? NULL : Threads_lock);
// We need to re-resolve the java_thread, since a GC might have happened during the
// acquire of the lock
JavaThread* thr = java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread));
if (thr != NULL) {
Thread::interrupt(thr);
}
JVM_END
可以看到這是一個JNI方法续崖,JVM_ENTRY是JNI調(diào)用的宏敲街。關(guān)鍵函數(shù)Thread::interrupt(thr),繼續(xù)跟蹤:
//hotspot\src\share\vm\runtime\thread.cpp
void Thread::interrupt(Thread* thread) {
trace("interrupt", thread);
debug_only(check_for_dangling_thread_pointer(thread);)
os::interrupt(thread);
}
關(guān)鍵函數(shù)os::interrupt严望,os此時分為linux多艇、solaris和windows,以linux為例繼續(xù)跟蹤:
//hotspot\src\os\linux\vm\os_linux.cpp
void os::interrupt(Thread* thread) {
assert(Thread::current() == thread || Threads_lock->owned_by_self(),
"possibility of dangling Thread pointer");
OSThread* osthread = thread->osthread();
if (!osthread->interrupted()) {
osthread->set_interrupted(true);
// More than one thread can get here with the same value of osthread,
// resulting in multiple notifications. We do, however, want the store
// to interrupted() to be visible to other threads before we execute unpark().
OrderAccess::fence();
ParkEvent * const slp = thread->_SleepEvent ;
if (slp != NULL) slp->unpark() ;
}
// For JSR166. Unpark even if interrupt status already was set
if (thread->is_Java_thread())
((JavaThread*)thread)->parker()->unpark();
ParkEvent * ev = thread->_ParkEvent ;
if (ev != NULL) ev->unpark() ;
}
每一個Java線程都與一個osthread一一對應(yīng)像吻,如果相應(yīng)的os線程沒有被中斷峻黍,則會設(shè)置osthread的interrupt標(biāo)志位為true(對應(yīng)一個volatile int),并喚醒線程的SleepEvent拨匆。隨后喚醒線程的parker和ParkEvent姆涩。
簡而言之,interrupt操作會對三種事件進行unpark喚醒惭每,分別是thread->_SleepEvent骨饿、thread->parker()和thread->_ParkEvent,這些變量的具體聲明如下:
//hotspot\src\share\vm\runtime\thread.cpp
public:
ParkEvent * _ParkEvent ; // for synchronized()
ParkEvent * _SleepEvent ; // for Thread.sleep
// JSR166 per-thread parker
private:
Parker* _parker;
public:
Parker* parker() { return _parker; }
喚醒ParkEvent
Thread類中包含了兩種作用不同的ParkEvent台腥,_ParkEvent變量用于synchronized同步塊和Object.wait()宏赘,_SleepEvent變量用于Thread.sleep(),ParkEvent類的聲明如下:
class ParkEvent : public os::PlatformEvent {
... //略黎侈,直接看父類
}
//hotspot\src\os\linux\vm\os_linux.cpp
class PlatformEvent : public CHeapObj {
private:
pthread_mutex_t _mutex [1] ;
pthread_cond_t _cond [1] ;
...
public:
PlatformEvent() {
int status;
status = pthread_cond_init (_cond, NULL);
assert_status(status == 0, status, "cond_init");
status = pthread_mutex_init (_mutex, NULL);
assert_status(status == 0, status, "mutex_init");
_Event = 0 ;
_nParked = 0 ;
_Assoc = NULL ;
}
void park () ;
void unpark () ;
int park (jlong millis) ;
...
} ;
ParkEvent包含了一把mutex互斥鎖和一個cond條件變量察署,并在構(gòu)造函數(shù)中進行了初始化,線程的阻塞和喚醒(park和unpark)就是通過他們實現(xiàn)的:
PlatformEvent::park() 方法會調(diào)用庫函數(shù)pthread_cond_wait(_cond, _mutex)實現(xiàn)線程等待峻汉。 synchronized塊的進入和Object.wait()的線程等待都是通過PlatformEvent::park()方法實現(xiàn), (Thread.join()是使用的Object.wait()實現(xiàn)的)
PlatformEvent::park(jlong millis) 方法會調(diào)用庫函數(shù)pthread_cond_timedwait(_cond, _mutex, _abstime)實現(xiàn)計時條件等待,Thread.sleep(millis)就是通過PlatformEvent::park(jlong millis)實現(xiàn)
PlatformEvent::unpark() 方法會調(diào)用庫函數(shù)pthread_cond_signal (_cond)喚醒上述等待的條件變量箕母。Thread.interrupt()就會觸發(fā)其子類SleepEvent和ParkEvent的unpark()方法储藐。synchronized塊的退出也會觸發(fā)unpark()。其所在對象ObjectMonitor維護了ParkEvent數(shù)組作為喚醒隊列嘶是,synchronized同步塊退出時钙勃,會觸發(fā)ParkEvent::unpark()方法來喚醒等待進入同步塊的線程,或等待在Object.wait()的線程聂喇。
上述Thread類的兩個ParkEvent成員變量:_ParkEvent和_SleepEvent辖源,都會在Thread.interrupt()時觸發(fā)unpark()動作。
對于_ParkEvent來說希太,它可以代表一個synchronized等待進入同步塊的時事件克饶,也可以代表一個Object.wait()等待條件變量的事件。不同的是誊辉,如果是synchronized等待事件矾湃,被喚醒后會嘗試獲取鎖,如果失敗則會通過循環(huán)繼續(xù)park()等待堕澄,因此synchronized等待實際上是不會被interrupt()中斷的邀跃;如果是Object.wait()事件,則會通過標(biāo)記為判斷出是否是被notify()喚醒的蛙紫,如果不是則拋出InterruptedException實現(xiàn)中斷拍屑。
對于_SleepEvent相對簡單一些,它只代表線程sleep動作坑傅,可能是java.lang.Thread.sleep()僵驰,也可能是jvm內(nèi)部調(diào)用的線程os::sleep()。如果是java.lang.Thread.sleep()唁毒,則會通過線程的is_interrupted標(biāo)記位來判斷拋出InterruptedException蒜茴。
喚醒Parker
除了喚醒SleepEvent和ParkEvent,Thread.interrupt()還會調(diào)用thread->parker()->unpark()來喚醒Thread的parker變量浆西。Parker類與上面的ParkEvent類很相似粉私,都持有一把mutex互斥鎖和一個cond條件變量。具體代碼見:
class Parker : public os::PlatformParker {
public:
// For simplicity of interface with Java, all forms of park (indefinite,
// relative, and absolute) are multiplexed into one call.
void park(bool isAbsolute, jlong time);
void unpark();
... //略室谚,直接看父類
}
//hotspot\src\os\linux\vm\os_linux.hpp
class PlatformParker : public CHeapObj {
protected:
pthread_mutex_t _mutex [1] ;
pthread_cond_t _cond [1] ;
public:
PlatformParker() {
int status;
status = pthread_cond_init (_cond, NULL);
assert_status(status == 0, status, "cond_init");
status = pthread_mutex_init (_mutex, NULL);
assert_status(status == 0, status, "mutex_init");
}
}
與ParkEvent一樣毡鉴,Parker使用著自己的鎖和park()/unpark()方法崔泵。
- Parker::park(bool isAbsolute, jlong time) 方法會調(diào)用庫函數(shù)pthread_cond_timedwait(_cond, _mutex, _abstime)實現(xiàn)計時條件等待秒赤,如果time=0則會直接使用pthread_cond_wait(_cond, _mutex)實現(xiàn)線程等待
- Parker::unpark() 方法會調(diào)用庫函數(shù)pthread_cond_signal (_cond)喚醒上述等待的條件變量
如源碼注釋,Thread的_parker變量更具有通用性憎瘸。凡是在Java代碼里通過unsafe.park()/unpark()的調(diào)用都會對應(yīng)到Thread的_parker變量去執(zhí)行入篮。而unsafe.park()/unpark()由java.util.concurrent.locks.LockSupport類調(diào)用,它支持了java.util.concurrent的各種鎖幌甘、條件變量等線程同步操作潮售。例如:ReentrantLock, CountDownLatch, ReentrantReadWriteLock, Semaphore, ThreadPoolExecutor, ConditionObject, ArrayBlockingQueue等痊项。
線程被Thread.interrupt()中斷時,并不意味著上述類的等待方法都會返回并拋出InterruptedException酥诽。盡管上述類最終等待在的unsafe.unpark()方法都會被喚醒鞍泉,其是否繼續(xù)執(zhí)行park()等待仍取決于具體實現(xiàn)。例如肮帐,Lock.lock()方法不會響應(yīng)中斷咖驮,Lock.lockInterruptibly()方法則會響應(yīng)中斷并拋出異常,二者實現(xiàn)的區(qū)別就在于park()等待被喚醒時是否繼續(xù)執(zhí)行park()來等待鎖
如何處理InterruptedException
- 方式1?如果自己很清楚當(dāng)前線程被中斷后的處理方式训枢,則按自己的方式處理托修。 通常是做好善后工作,主動退出線程
- 方式2?直接在方法聲明中throws InterruptedException恒界,丟給上層處理睦刃。這種方式也很常見,將中斷的處置權(quán)交給具體的業(yè)務(wù)來處理
- 方式3?重新設(shè)置中斷標(biāo)記位十酣,Thread.currentThread().interrupt()涩拙,交給后續(xù)方法處理
原因是底層拋出InterruptedException時會清除中斷標(biāo)記位,捕獲到異常后如果不想處理婆誓,可以重新設(shè)置中斷標(biāo)記位
try {
...
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
注意?請不要吞掉InterruptedException吃环,可能會導(dǎo)致上層的調(diào)用方出現(xiàn)不可預(yù)料的結(jié)果
小結(jié)
終止一個Java線程最好的方式,就是讓run()方法主動退出洋幻。因為強制的讓一個線程被動的退出是很不安全的郁轻,內(nèi)部的數(shù)據(jù)不一致會對程序造成不可預(yù)知的后果。
為了能夠通知一個線程需要被終止文留,Java提供了Thread.interrupt()方法好唯,該方法會設(shè)置線程中斷的標(biāo)記位,并喚醒可中斷的阻塞方法燥翅,包括Thread.sleep()骑篙,Object.wait(),nio通道的IO等待森书,以及LockSupport.park()靶端。識別一個方法是否會被中斷,只需要看其聲明中是否會throws InterruptedException或ClosedByInterruptException凛膏。
每個Java線程都會對應(yīng)一個osthread杨名,它持有了三種條件變量,分別用于Thread.sleep()猖毫,Object.wait()和unsafe.park()台谍。Thread.interrupt()會依次喚醒三個條件變量,以達到中斷的目的吁断。線程的同步與喚醒最終都使用了pthread_cond_wait和pthread_cond_signal這些pthread庫函數(shù)趁蕊。