離上次tomcat的代碼走讀過了很長時間了剩晴,這段時間主要是在自己造個類似web容器的輪子深碱,用來加深NIO的理解。同時大致走讀了下undertow相關(guān)的代碼。今天繼續(xù)tomcat的代碼走讀讯泣。主要內(nèi)容是tomcat如何從接收請求到處理請求并回執(zhí)衅枫。
首先看下tomcat處理請求的線程模型嫁艇。
tomcat處理請求主要有三種線程,這里就用線程名里的關(guān)鍵字來標識弦撩。
- Acceptor線程 主要是接收請求 步咪。也是一個線程。
- ClientPoller線程主要是向selector中注冊socket益楼,并分發(fā)到處理線程中猾漫。這是一個線程。
- exec線程 主要負責處理真正的請求感凤。調(diào)用servlet的service方法悯周。這是一個線程池。
下面從源碼層面看是如何處理一個請求的陪竿。
首先看上面說的三個線程的啟動禽翼。
// Create worker collection
if (getExecutor() == null) {
//工作線程
createExecutor();
}
initializeConnectionLatch();
// Start poller thread
//分發(fā)線程
poller = new Poller();
Thread pollerThread = new Thread(poller, getName() + "-ClientPoller");
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
pollerThread.start();
//接收請求線程
startAcceptorThread();
工作線程的源碼如下:
public void createExecutor() {
internalExecutor = true;
TaskQueue taskqueue = new TaskQueue();
TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
taskqueue.setParent( (ThreadPoolExecutor) executor);
}
這段代碼比較簡單,主要是創(chuàng)建一個工作的線程池萨惑,線程池的初始線程數(shù)默認是10個捐康,最大線程數(shù)默認是200。
下面看Acceptor的處理邏輯庸蔼。
先上時序圖:
1解总、Acceptor在啟動后,在run方法里調(diào)用accept方法姐仅,該方法會阻塞直到有連接到來花枫。
2、在獲取到channel后掏膏,將該channel封裝成NioChannel劳翰,并注冊到Poller中去。注冊的過程就是簡單的把NioChannel封裝成的PollerEvent add到事件隊列中馒疹。
至此佳簸,Acceptor完成了自己的使命了,接下來就是循環(huán)等待下一個請求的到來。
相關(guān)的源碼如下生均,有些跟主體流程無關(guān)的代碼就直接隱去了听想。
@Override
public void run() {
// Loop until we receive a shutdown command
while (endpoint.isRunning()) {
try {
//if we have reached max connections, wait
endpoint.countUpOrAwaitConnection();
// Endpoint might have been paused while waiting for latch
// If that is the case, don't accept new connections
if (endpoint.isPaused()) {
continue;
}
U socket = null;
try {
//獲取連接,線程會阻塞在這個地方马胧,直到有新的連接請求過來汉买。
socket = endpoint.serverSocketAccept();
} catch (Exception ioe) {
}
// Successful accept, reset the error delay
errorDelay = 0;
// Configure the socket
if (endpoint.isRunning() && !endpoint.isPaused()) {
// setSocketOptions里代碼比較長,主要是將獲取的channel封裝成時間佩脊,注冊到poller的事件隊列里供Poller線程分發(fā)蛙粘。
if (!endpoint.setSocketOptions(socket)) {
endpoint.closeSocket(socket);
}
} else {
endpoint.destroySocket(socket);
}
} catch (Throwable t) {
。威彰。出牧。
}
。抱冷。崔列。
}
下面來看下setSocketOptions()
的實現(xiàn)
/**
* Process the specified connection.
* @param socket The socket channel
* @return <code>true</code> if the socket was correctly configured
* and processing may continue, <code>false</code> if the socket needs to be
* close immediately
*/
@Override
protected boolean setSocketOptions(SocketChannel socket) {
NioSocketWrapper socketWrapper = null;
try {
// Allocate channel and wrapper
NioChannel channel = null;
if (nioChannels != null) {
channel = nioChannels.pop();
}
if (channel == null) {
SocketBufferHandler bufhandler = new SocketBufferHandler(
socketProperties.getAppReadBufSize(),
socketProperties.getAppWriteBufSize(),
socketProperties.getDirectBuffer());
if (isSSLEnabled()) {
channel = new SecureNioChannel(bufhandler, selectorPool, this);
} else {
channel = new NioChannel(bufhandler);
}
}
NioSocketWrapper newWrapper = new NioSocketWrapper(channel, this);
channel.reset(socket, newWrapper);
connections.put(socket, newWrapper);
socketWrapper = newWrapper;
// Set socket properties
// Disable blocking, polling will be used
socket.configureBlocking(false);
socketProperties.setProperties(socket.socket());
socketWrapper.setReadTimeout(getConnectionTimeout());
socketWrapper.setWriteTimeout(getConnectionTimeout());
socketWrapper.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
socketWrapper.setSecure(isSSLEnabled());
poller.register(channel, socketWrapper);
return true;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
try {
log.error(sm.getString("endpoint.socketOptionsError"), t);
} catch (Throwable tt) {
ExceptionUtils.handleThrowable(tt);
}
if (socketWrapper == null) {
destroySocket(socket);
}
}
// Tell to close the socket if needed
return false;
}
代碼很簡單,就不做細致的分析與注釋了旺遮。
register
方法代碼:
public void register(final NioChannel socket, final NioSocketWrapper socketWrapper) {
socketWrapper.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
PollerEvent event = null;
if (eventCache != null) {
event = eventCache.pop();
}
if (event == null) {
event = new PollerEvent(socket, OP_REGISTER);
} else {
event.reset(socket, OP_REGISTER);
}
addEvent(event);
}
addEvent
代碼
private void addEvent(PollerEvent event) {
events.offer(event);
if (wakeupCounter.incrementAndGet() == 0) {
selector.wakeup();
}
}
到這里,Acceptor代碼完成盈咳。請求已經(jīng)接進來了耿眉。下面看Poller線程如何分發(fā)請求到worker線程池里。
1鱼响、poller線程run方法里鸣剪,第一件事情是處理事件隊列里的事件,將Acceptor里注冊的channel注冊到Poller中的selector中丈积。
2筐骇、在selector中選取準備好的channel,循環(huán)進行處理江滨。
3铛纬、將步驟二中選取的連接丟到工作線程池中進行處理。
至此唬滑,Poller的工作就完成了告唆。接下來就是循環(huán)等待新的事件。
相關(guān)源碼大致為:
run()
的代碼如下:
/**
* The background thread that adds sockets to the Poller, checks the
* poller for triggered events and hands the associated socket off to an
* appropriate processor as events occur.
*/
@Override
public void run() {
// Loop until destroy() is called
while (true) {
boolean hasEvents = false;
try {
if (!close) {
hasEvents = events();
if (wakeupCounter.getAndSet(-1) > 0) {
// If we are here, means we have other stuff to do
// Do a non blocking select
keyCount = selector.selectNow();
} else {
keyCount = selector.select(selectorTimeout);
}
wakeupCounter.set(0);
}
......
// Either we timed out or we woke up, process events first
if (keyCount == 0) {
hasEvents = (hasEvents | events());
}
Iterator<SelectionKey> iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
// Walk through the collection of ready keys and dispatch
// any active event.
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
if (socketWrapper == null) {
iterator.remove();
} else {
iterator.remove();
//開始處理秀暗中的連接
processKey(sk, socketWrapper);
}
}
}
}
processKey()
的源碼:
protected void processKey(SelectionKey sk, NioSocketWrapper socketWrapper) {
try {
...
...
} else if (!processSocket(socketWrapper, SocketEvent.OPEN_READ, true)) {
closeSocket = true;
}
}
.....
if (closeSocket) {
cancelledKey(sk, socketWrapper);
}
}
}
} else {
// Invalid key
cancelledKey(sk, socketWrapper);
}
} catch (CancelledKeyException ckx) {
....
}
processKey
主要功能是構(gòu)造processSocket
方法的參數(shù)晶密。下面看processSocket
相關(guān)的代碼:
public boolean processSocket(SocketWrapperBase<S> socketWrapper,
SocketEvent event, boolean dispatch) {
try {
if (socketWrapper == null) {
return false;
}
SocketProcessorBase<S> sc = null;
if (processorCache != null) {
sc = processorCache.pop();
}
if (sc == null) {
sc = createSocketProcessor(socketWrapper, event);
} else {
sc.reset(socketWrapper, event);
}
Executor executor = getExecutor();
if (dispatch && executor != null) {
executor.execute(sc);
} else {
sc.run();
}
} catch (RejectedExecutionException ree) {
getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
return false;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
getLog().error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}
上面的方法就是向工作線程池里丟一個任務(wù)進行處理擒悬。丟進去的線程為SocketProcessor
,該類繼承自SocketProcessorBase,這個類實現(xiàn)了Runnable。
到這里稻艰,Poller相關(guān)的代碼大致走完懂牧。下面就看worker線程里做了什么事情了。這里就正式的跟servlet打交道了尊勿。
我們主要看SocketProcessor
類的run()
方法的處理邏輯僧凤。
按照慣例畜侦,先上時序圖。
從時序圖上可以清楚的了解worker的工作流程拼弃。這里就不做詳細的說明了夏伊。
關(guān)于servlet相關(guān)的處理,放在下篇文章里進行詳細解讀吻氧。
到此溺忧,tomcat接收請求的過程源碼大體已經(jīng)處理完成了。接下來就是servlet的處理了盯孙。