本文使用Spring Boot提供的嵌入式Tomcat為例來分析Tomcat的啟動過程吩案、線程池管理及請求過程典徘。
從org.apache.tomcat.util.net.NioEndpoint#startInternal
開始說起:
@Override
public void startInternal() throws Exception {
if (!running) {
running = true;
paused = false;
processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getProcessorCache());
eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getEventCache());
nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getBufferPool());
// Create worker collection
if ( getExecutor() == null ) {
createExecutor();
}
initializeConnectionLatch();
// Start poller threads
pollers = new Poller[getPollerThreadCount()];
for (int i=0; i<pollers.length; i++) {
pollers[i] = new Poller();
Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
pollerThread.start();
}
startAcceptorThreads();
}
}
這里我們關(guān)注下創(chuàng)建Executor
的代碼奢方,通過條件判斷是否存在Executor
,不存在則執(zhí)行createExecutor()
方法。
public void createExecutor() {
internalExecutor = true;
TaskQueue taskqueue = new TaskQueue();
TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
taskqueue.setParent( (ThreadPoolExecutor) executor);
}
TaskThreadFactory
是一個ThreadFactory
的實現(xiàn)欲诺,里面定義了工作線程的線程名前綴和線程優(yōu)先級腮郊,然后實例化ThreadPoolExecutor
摹蘑,這里的ThreadPoolExecutor
其實是Tomcat自己的實現(xiàn),全稱為org.apache.tomcat.util.threads.ThreadPoolExecutor
轧飞,其繼承自juc的java.util.concurrent.ThreadPoolExecutor
衅鹿,我們可以理解為tomcat的ThreadPoolExecutor
對juc提供的ThreadPoolExecutor
進(jìn)行了一層包裝,最終返回一個Executor
實例过咬,這里創(chuàng)建Executor
的工作就結(jié)束了大渤。
值得注意的是在new ThreadPoolExecutor
的時候就會創(chuàng)建核心線程,因為這個是通過TaskThreadFactory
來創(chuàng)建線程的掸绞,所以我們可以稍微關(guān)注下它里面創(chuàng)建線程的方法:
@Override
public Thread newThread(Runnable r) {
TaskThread t = new TaskThread(group, r, namePrefix + threadNumber.getAndIncrement());
t.setDaemon(daemon);
t.setPriority(threadPriority);
// Set the context class loader of newly created threads to be the class
// loader that loaded this factory. This avoids retaining references to
// web application class loaders and similar.
if (Constants.IS_SECURITY_ENABLED) {
PrivilegedAction<Void> pa = new PrivilegedSetTccl(
t, getClass().getClassLoader());
AccessController.doPrivileged(pa);
} else {
t.setContextClassLoader(getClass().getClassLoader());
}
return t;
}
可以看到這里已經(jīng)指定了線程的名稱泵三,形如http-nio-8080-exec-3
。
回到startInternal
方法衔掸,接著便是創(chuàng)建pollers
并啟動(線程)烫幕,pollers
的大小默認(rèn)為2和處理器核數(shù)的最小值,創(chuàng)建Poller的代碼如下:
public Poller() throws IOException {
this.selector = Selector.open();
}
這里說明下Poller
實現(xiàn)了Runnable
接口敞映,且是NioEndpoint
的一個內(nèi)部類较曼。
/**
* Poller class.
*/
public class Poller implements Runnable {
...
}
startInternal
方法的介紹就告一段落,接著來看看Poller
的run
方法:
@Override
public void run() {
// Loop until destroy() is called
while (true) {
boolean hasEvents = false;
try {
if (!close) {
hasEvents = events();
if (wakeupCounter.getAndSet(-1) > 0) {
//if we are here, means we have other stuff to do
//do a non blocking select
keyCount = selector.selectNow();
} else {
keyCount = selector.select(selectorTimeout);
}
wakeupCounter.set(0);
}
if (close) {
events();
timeout(0, false);
try {
selector.close();
} catch (IOException ioe) {
log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
}
break;
}
} catch (Throwable x) {
ExceptionUtils.handleThrowable(x);
log.error("",x);
continue;
}
//either we timed out or we woke up, process events first
if ( keyCount == 0 ) hasEvents = (hasEvents | events());
Iterator<SelectionKey> iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
// Walk through the collection of ready keys and dispatch
// any active event.
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
// Attachment may be null if another thread has called
// cancelledKey()
if (attachment == null) {
iterator.remove();
} else {
iterator.remove();
processKey(sk, attachment);
}
}//while
//process timeouts
timeout(keyCount,hasEvents);
}//while
getStopLatch().countDown();
}
主要的邏輯是通過while
循環(huán)去遍歷SelectionKey
驱显,如果有請求過來诗芜,則會交給processKey
方法處理瞳抓。
protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
try {
if ( close ) {
cancelledKey(sk);
} else if ( sk.isValid() && attachment != null ) {
if (sk.isReadable() || sk.isWritable() ) {
if ( attachment.getSendfileData() != null ) {
processSendfile(sk,attachment, false);
} else {
unreg(sk, attachment, sk.readyOps());
boolean closeSocket = false;
// Read goes before write
if (sk.isReadable()) {
if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
closeSocket = true;
}
}
if (!closeSocket && sk.isWritable()) {
if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
closeSocket = true;
}
}
if (closeSocket) {
cancelledKey(sk);
}
}
}
} else {
//invalid key
cancelledKey(sk);
}
} catch ( CancelledKeyException ckx ) {
cancelledKey(sk);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error("",t);
}
}
這里對nio的SelectionKey
實例進(jìn)行判斷,發(fā)現(xiàn)sk.isReadable()
為true
伏恐,最后交給processSocket
方法孩哑。
public boolean processSocket(SocketWrapperBase<S> socketWrapper,
SocketEvent event, boolean dispatch) {
try {
if (socketWrapper == null) {
return false;
}
SocketProcessorBase<S> sc = processorCache.pop();
if (sc == null) {
sc = createSocketProcessor(socketWrapper, event);
} else {
sc.reset(socketWrapper, event);
}
Executor executor = getExecutor();
if (dispatch && executor != null) {
executor.execute(sc);
} else {
sc.run();
}
} catch (RejectedExecutionException ree) {
getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
return false;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
getLog().error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}
這里通過getExecutor
方法獲取之前創(chuàng)建的Executor
實例,然后執(zhí)行execute
方法提交請求任務(wù)翠桦。至此横蜒,一個完整的請求便會經(jīng)過Tomcat到Servlet,并最終進(jìn)入對應(yīng)的handler
對請求進(jìn)行業(yè)務(wù)處理销凑。
以上丛晌。