概述
Thread提供了interrupt方法茶袒,中斷線程的執(zhí)行:
- 如果線程堵塞在object.wait羽戒、Thread.join和Thread.sleep勺远,將會(huì)拋出InterruptedException,同時(shí)清除線程的中斷狀態(tài);
- 如果線程堵塞在java.nio.channels.InterruptibleChannel的IO上首尼,Channel將會(huì)被關(guān)閉蝶锋,線程被置為中斷狀態(tài)陆爽,并拋出java.nio.channels.ClosedByInterruptException;
- 如果線程堵塞在java.nio.channels.Selector上扳缕,線程被置為中斷狀態(tài)慌闭,select方法會(huì)馬上返回恶守,類似調(diào)用wakeup的效果;
public void interrupt() {
if (this != Thread.currentThread())
checkAccess();
synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); // Just to set the interrupt flag
b.interrupt(this);
return;
}
}
interrupt0();
}
源碼實(shí)現(xiàn)
之前在分析Thread.start的時(shí)候曾經(jīng)提到贡必,JavaThread有三個(gè)成員變量:
//用于synchronized同步塊和Object.wait()
ParkEvent * _ParkEvent ;
//用于Thread.sleep()
ParkEvent * _SleepEvent ;
//用于unsafe.park()/unpark(),供java.util.concurrent.locks.LockSupport調(diào)用兔港,
//因此它支持了java.util.concurrent的各種鎖、條件變量等線程同步操作,是concurrent的實(shí)現(xiàn)基礎(chǔ)
Parker* _parker;
初步猜測(cè)interrupt實(shí)現(xiàn)應(yīng)該與此有關(guān)系仔拟;
interrupt方法的源碼也在jvm.cpp文件:
JVM_ENTRY(void, JVM_Interrupt(JNIEnv* env, jobject jthread))
JVMWrapper("JVM_Interrupt");
// Ensure that the C++ Thread and OSThread structures aren't freed before we operate
oop java_thread = JNIHandles::resolve_non_null(jthread);
MutexLockerEx ml(thread->threadObj() == java_thread ? NULL : Threads_lock);
// We need to re-resolve the java_thread, since a GC might have happened during the
// acquire of the lock
JavaThread* thr = java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread));
if (thr != NULL) {
Thread::interrupt(thr);
}
JVM_END
JVM_Interrupt對(duì)參賽進(jìn)行了校驗(yàn)衫樊,然后直接調(diào)用Thread::interrupt:
void Thread::interrupt(Thread* thread) {
trace("interrupt", thread);
debug_only(check_for_dangling_thread_pointer(thread);)
os::interrupt(thread);
}
Thread::interrupt調(diào)用os::interrupt方法實(shí)現(xiàn),os::interrupt方法定義在os_linux.cpp:
void os::interrupt(Thread* thread) {
assert(Thread::current() == thread || Threads_lock->owned_by_self(),
"possibility of dangling Thread pointer");
//獲取系統(tǒng)native線程對(duì)象
OSThread* osthread = thread->osthread();
if (!osthread->interrupted()) {
osthread->set_interrupted(true);
//內(nèi)存屏障,使osthread的interrupted狀態(tài)對(duì)其它線程立即可見(jiàn)
OrderAccess::fence();
//前文說(shuō)過(guò)利花,_SleepEvent用于Thread.sleep,線程調(diào)用了sleep方法科侈,則通過(guò)unpark喚醒
ParkEvent * const slp = thread->_SleepEvent ;
if (slp != NULL) slp->unpark() ;
}
//_parker用于concurrent相關(guān)的鎖,此處同樣通過(guò)unpark喚醒
if (thread->is_Java_thread())
((JavaThread*)thread)->parker()->unpark();
//synchronized同步塊和Object.wait() 喚醒
ParkEvent * ev = thread->_ParkEvent ;
if (ev != NULL) ev->unpark() ;
}
由此可見(jiàn)炒事,interrupt其實(shí)就是通過(guò)ParkEvent的unpark方法喚醒對(duì)象臀栈;另外要注意:
- object.wait、Thread.sleep和Thread.join會(huì)拋出InterruptedException并清除中斷狀態(tài)挠乳;
- Lock.lock()方法不會(huì)響應(yīng)中斷权薯,Lock.lockInterruptibly()方法則會(huì)響應(yīng)中斷并拋出異常,區(qū)別在于park()等待被喚醒時(shí)lock會(huì)繼續(xù)執(zhí)行park()來(lái)等待鎖睡扬,而 lockInterruptibly會(huì)拋出異常盟蚣;
- synchronized被喚醒后會(huì)嘗試獲取鎖,失敗則會(huì)通過(guò)循環(huán)繼續(xù)park()等待卖怜,因此實(shí)際上是不會(huì)被interrupt()中斷的;
- 一般情況下屎开,拋出異常時(shí),會(huì)清空Thread的interrupt狀態(tài)马靠,在編程時(shí)需要注意奄抽;
網(wǎng)絡(luò)相關(guān)的中斷
之前的interrupt方法有這么一段:
private volatile Interruptible blocker;
private final Object blockerLock = new Object();
synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); // Just to set the interrupt flag
b.interrupt(this);
return;
}
}
其中blocker是Thread的成員變量,Thread提供了blockedOn方法可以設(shè)置blocker:
void blockedOn(Interruptible b) {
synchronized (blockerLock) {
blocker = b;
}
}
如果一個(gè)nio通道實(shí)現(xiàn)了InterruptibleChannel接口,就可以響應(yīng)interrupt()中斷甩鳄,其原理就在InterruptibleChannel接口的抽象實(shí)現(xiàn)類AbstractInterruptibleChannel的方法begin()中:
protected final void begin() {
if (interruptor == null) {
interruptor = new Interruptible() {
public void interrupt(Thread target) {
synchronized (closeLock) {
if (!open)
return;
open = false;
interrupted = target;
try {
AbstractInterruptibleChannel.this.implCloseChannel();
} catch (IOException x) { }
}
}};
}
blockedOn(interruptor);//設(shè)置當(dāng)前線程的blocker為interruptor
Thread me = Thread.currentThread();
if (me.isInterrupted())
interruptor.interrupt(me);
}
protected final void end(boolean completed)
throws AsynchronousCloseException
{
blockedOn(null);//設(shè)置當(dāng)前線程的blocker為null
Thread interrupted = this.interrupted;
//如果發(fā)生中斷逞度,Thread.interrupt方法會(huì)調(diào)用Interruptible的interrupt方法,
//設(shè)置this.interrupted為當(dāng)前線程
if (interrupted != null && interrupted == Thread.currentThread()) {
interrupted = null;
throw new ClosedByInterruptException();
}
if (!completed && !open)
throw new AsynchronousCloseException();
}
//Class java.nio.channels.Channels.WritableByteChannelImpl
public int write(ByteBuffer src) throws IOException {
......
try {
begin();
out.write(buf, 0, bytesToWrite);
finally {
end(bytesToWrite > 0);
}
......
}
//Class java.nio.channels.Channels.ReadableByteChannelImpl
public int read(ByteBuffer dst) throws IOException {
......
try {
begin();
bytesRead = in.read(buf, 0, bytesToRead);
finally {
end(bytesRead > 0);
}
......
}
以上述代碼為例娩贷,nio通道的ReadableByteChannel每次執(zhí)行阻塞方法read()前第晰,都會(huì)執(zhí)行begin(),把Interruptible回調(diào)接口注冊(cè)到當(dāng)前線程上彬祖。當(dāng)線程中斷時(shí)茁瘦,Thread.interrupt()觸發(fā)回調(diào)接口Interruptible關(guān)閉io通道,導(dǎo)致read方法返回,最后在finally塊中執(zhí)行end()方法檢查中斷標(biāo)記储笑,拋出ClosedByInterruptException;
Selector的實(shí)現(xiàn)類似:
//java.nio.channels.spi.AbstractSelector
protected final void begin() {
if (interruptor == null) {
interruptor = new Interruptible() {
public void interrupt(Thread ignore) {
AbstractSelector.this.wakeup();
}};
}
AbstractInterruptibleChannel.blockedOn(interruptor);
Thread me = Thread.currentThread();
if (me.isInterrupted())
interruptor.interrupt(me);
}
protected final void end() {
AbstractInterruptibleChannel.blockedOn(null);
}
//sun.nio.ch.class EPollSelectorImpl
protected int doSelect(long timeout) throws IOException {
......
try {
begin();
pollWrapper.poll(timeout);
} finally {
end();
}
......
}
可以看到當(dāng)發(fā)生中斷時(shí)會(huì)調(diào)用wakeup方法喚醒poll方法甜熔,但并不會(huì)拋出中斷異常;