在分析了Tomcat的啟動(dòng)過程和各個(gè)組件后俩块,本文開始分析Tomcat是如何處理請(qǐng)求的间影。讓我們回到Tomcat啟動(dòng)分析(六)文章的末尾沟突,在AbstractEndPoint類的processSocket方法中篮幢,工作線程池執(zhí)行的任務(wù)是一個(gè)SocketProcessorBase慷嗜,本文從SocketProcessorBase開始分析颂斜。
SocketProcessorBase類
SocketProcessorBase是一個(gè)實(shí)現(xiàn)了Runnable接口的抽象類夫壁,run方法調(diào)用了doRun抽象方法:
public abstract class SocketProcessorBase<S> implements Runnable {
protected SocketWrapperBase<S> socketWrapper;
protected SocketEvent event;
public SocketProcessorBase(SocketWrapperBase<S> socketWrapper, SocketEvent event) {
reset(socketWrapper, event);
}
public void reset(SocketWrapperBase<S> socketWrapper, SocketEvent event) {
Objects.requireNonNull(event);
this.socketWrapper = socketWrapper;
this.event = event;
}
@Override
public final void run() {
synchronized (socketWrapper) {
// It is possible that processing may be triggered for read and
// write at the same time. The sync above makes sure that processing
// does not occur in parallel. The test below ensures that if the
// first event to be processed results in the socket being closed,
// the subsequent events are not processed.
if (socketWrapper.isClosed()) {
return;
}
doRun();
}
}
protected abstract void doRun();
}
NioEndPoint的createSocketProcessor函數(shù)返回的SocketProcessor是NioEndPoint類的內(nèi)部類,繼承了SocketProcessorBase抽象類,實(shí)現(xiàn)了doRun方法。該類的注釋說明SocketProcessor與Worker的作用等價(jià)汗贫。
/**
* This class is the equivalent of the Worker, but will simply use in an
* external Executor thread pool.
*/
protected class SocketProcessor extends SocketProcessorBase<NioChannel> {
public SocketProcessor(SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) {
super(socketWrapper, event);
}
@Override
protected void doRun() {
NioChannel socket = socketWrapper.getSocket();
SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
try {
int handshake = -1;
try {
if (key != null) {
if (socket.isHandshakeComplete()) {
// No TLS handshaking required. Let the handler
// process this socket / event combination.
handshake = 0;
} else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT ||
event == SocketEvent.ERROR) {
// Unable to complete the TLS handshake. Treat it as
// if the handshake failed.
handshake = -1;
} else {
handshake = socket.handshake(key.isReadable(), key.isWritable());
// The handshake process reads/writes from/to the
// socket. status may therefore be OPEN_WRITE once
// the handshake completes. However, the handshake
// happens when the socket is opened so the status
// must always be OPEN_READ after it completes. It
// is OK to always set this as it is only used if
// the handshake completes.
event = SocketEvent.OPEN_READ;
}
}
} catch (IOException x) {
handshake = -1;
if (log.isDebugEnabled()) log.debug("Error during SSL handshake",x);
} catch (CancelledKeyException ckx) {
handshake = -1;
}
if (handshake == 0) {
SocketState state = SocketState.OPEN;
// Process the request from this socket
if (event == null) {
state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
} else {
state = getHandler().process(socketWrapper, event);
}
if (state == SocketState.CLOSED) {
close(socket, key);
}
} else if (handshake == -1 ) {
close(socket, key);
} else if (handshake == SelectionKey.OP_READ){
socketWrapper.registerReadInterest();
} else if (handshake == SelectionKey.OP_WRITE){
socketWrapper.registerWriteInterest();
}
} catch (CancelledKeyException cx) {
socket.getPoller().cancelledKey(key);
} catch (VirtualMachineError vme) {
ExceptionUtils.handleThrowable(vme);
} catch (Throwable t) {
log.error("", t);
socket.getPoller().cancelledKey(key);
} finally {
socketWrapper = null;
event = null;
//return to cache
if (running && !paused) {
processorCache.push(this);
}
}
}
}
- handshake相關(guān)的變量與函數(shù)都與SSL的握手有關(guān),暫時(shí)忽略邑茄,NioChannel的handshake函數(shù)直接返回了0;
- 真正的處理過程在getHandler().process這處調(diào)用俊啼。
getHandler函數(shù)定義在AbstractEndPoint類中肺缕,相關(guān)代碼如下:
/**
* Handling of accepted sockets.
*/
private Handler<S> handler = null;
public void setHandler(Handler<S> handler ) { this.handler = handler; }
public Handler<S> getHandler() { return handler; }
Handler是何時(shí)被賦值的呢?答案在Http11NioProtocol對(duì)象被創(chuàng)建的過程中授帕,調(diào)用父類構(gòu)造函數(shù)時(shí)會(huì)為端點(diǎn)的Handler賦值:
public AbstractHttp11Protocol(AbstractEndpoint<S> endpoint) {
super(endpoint);
setConnectionTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT);
ConnectionHandler<S> cHandler = new ConnectionHandler<>(this);
setHandler(cHandler);
getEndpoint().setHandler(cHandler);
}
ConnectionHandler類
ConnectionHandler是AbstractProtocol類的靜態(tài)內(nèi)部類同木,上文調(diào)用了該類的process函數(shù),由于代碼較多此處不再展示豪墅。由于能力有限泉手,本文只分析最常見情景對(duì)應(yīng)的代碼:
- processor = getProtocol().createProcessor();
createProcessor是定義在AbstractProtocol中的抽象方法,AbstractHttp11Protocol實(shí)現(xiàn)了它:@Override protected Processor createProcessor() { Http11Processor processor = new Http11Processor(getMaxHttpHeaderSize(), getAllowHostHeaderMismatch(), getRejectIllegalHeaderName(), getEndpoint(), getMaxTrailerSize(), allowedTrailerHeaders, getMaxExtensionSize(), getMaxSwallowSize(), httpUpgradeProtocols, getSendReasonPhrase(), relaxedPathChars, relaxedQueryChars); processor.setAdapter(getAdapter()); processor.setMaxKeepAliveRequests(getMaxKeepAliveRequests()); processor.setConnectionUploadTimeout(getConnectionUploadTimeout()); processor.setDisableUploadTimeout(getDisableUploadTimeout()); processor.setCompressionMinSize(getCompressionMinSize()); processor.setCompression(getCompression()); processor.setNoCompressionUserAgents(getNoCompressionUserAgents()); processor.setCompressibleMimeTypes(getCompressibleMimeTypes()); processor.setRestrictedUserAgents(getRestrictedUserAgents()); processor.setMaxSavePostSize(getMaxSavePostSize()); processor.setServer(getServer()); processor.setServerRemoveAppProvidedValues(getServerRemoveAppProvidedValues()); return processor; }
- state = processor.process(wrapper, status);這一行委托給Processor的process函數(shù)繼續(xù)處理請(qǐng)求偶器。
Processor接口
Processor接口用來處理協(xié)議的請(qǐng)求斩萌,類層次結(jié)構(gòu)如下圖所示。
Http11Processor類的process函數(shù)定義在其父類AbstractProcessorLight中屏轰,代碼如下颊郎,它會(huì)接著調(diào)用service抽象方法處理請(qǐng)求。
@Override
public SocketState process(SocketWrapperBase<?> socketWrapper, SocketEvent status)
throws IOException {
SocketState state = SocketState.CLOSED;
Iterator<DispatchType> dispatches = null;
do {
if (dispatches != null) {
DispatchType nextDispatch = dispatches.next();
state = dispatch(nextDispatch.getSocketStatus());
} else if (status == SocketEvent.DISCONNECT) {
// Do nothing here, just wait for it to get recycled
} else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) {
state = dispatch(status);
if (state == SocketState.OPEN) {
// There may be pipe-lined data to read. If the data isn't
// processed now, execution will exit this loop and call
// release() which will recycle the processor (and input
// buffer) deleting any pipe-lined data. To avoid this,
// process it now.
state = service(socketWrapper);
}
} else if (status == SocketEvent.OPEN_WRITE) {
// Extra write event likely after async, ignore
state = SocketState.LONG;
} else if (status == SocketEvent.OPEN_READ){
state = service(socketWrapper);
} else {
// Default to closing the socket if the SocketEvent passed in
// is not consistent with the current state of the Processor
state = SocketState.CLOSED;
}
// 省略一些代碼
} while (state == SocketState.ASYNC_END ||
dispatches != null && state != SocketState.CLOSED);
return state;
}
Http11Processor類實(shí)現(xiàn)了自己的service方法霎苗,由于代碼較多此處不再展示姆吭,重要的處理流程是getAdapter().service(request, response);這一行。
Http11Processor的adapter是在對(duì)象生成后set上去的唁盏,其參數(shù)是調(diào)用AbstractProtocol的getAdapter方法得到的内狸,而AbstractProtocol的adapter是Connector初始化的時(shí)候被賦值的检眯,代碼如下:
@Override
protected void initInternal() throws LifecycleException {
super.initInternal();
// Initialize adapter
adapter = new CoyoteAdapter(this);
protocolHandler.setAdapter(adapter);
// 省略一些代碼
}
CoyoteAdapter類
給ProtocolHandler設(shè)置的Adapter是CoyoteAdapter類型,其service方法代碼如下:
@Override
public void service(org.apache.coyote.Request req, org.apache.coyote.Response res)
throws Exception {
Request request = (Request) req.getNote(ADAPTER_NOTES);
Response response = (Response) res.getNote(ADAPTER_NOTES);
if (request == null) {
// Create objects
request = connector.createRequest();
request.setCoyoteRequest(req);
response = connector.createResponse();
response.setCoyoteResponse(res);
// Link objects
request.setResponse(response);
response.setRequest(request);
// Set as notes
req.setNote(ADAPTER_NOTES, request);
res.setNote(ADAPTER_NOTES, response);
// Set query string encoding
req.getParameters().setQueryStringCharset(connector.getURICharset());
}
if (connector.getXpoweredBy()) {
response.addHeader("X-Powered-By", POWERED_BY);
}
boolean async = false;
boolean postParseSuccess = false;
req.getRequestProcessor().setWorkerThreadName(THREAD_NAME.get());
try {
// Parse and set Catalina and configuration specific
// request parameters
postParseSuccess = postParseRequest(req, request, res, response);
if (postParseSuccess) {
//check valves if we support async
request.setAsyncSupported(
connector.getService().getContainer().getPipeline().isAsyncSupported());
// Calling the container
connector.getService().getContainer().getPipeline().getFirst().invoke(
request, response);
}
if (request.isAsync()) {
async = true;
ReadListener readListener = req.getReadListener();
if (readListener != null && request.isFinished()) {
// Possible the all data may have been read during service()
// method so this needs to be checked here
ClassLoader oldCL = null;
try {
oldCL = request.getContext().bind(false, null);
if (req.sendAllDataReadEvent()) {
req.getReadListener().onAllDataRead();
}
} finally {
request.getContext().unbind(false, oldCL);
}
}
Throwable throwable =
(Throwable) request.getAttribute(RequestDispatcher.ERROR_EXCEPTION);
// If an async request was started, is not going to end once
// this container thread finishes and an error occurred, trigger
// the async error process
if (!request.isAsyncCompleting() && throwable != null) {
request.getAsyncContextInternal().setErrorState(throwable, true);
}
} else {
request.finishRequest();
response.finishResponse();
}
} catch (IOException e) {
// Ignore
} finally {
// 省略一些代碼
}
}
請(qǐng)求預(yù)處理
postParseRequest方法對(duì)請(qǐng)求做預(yù)處理昆淡,如對(duì)路徑去除分號(hào)表示的路徑參數(shù)锰瘸、進(jìn)行URI解碼、規(guī)格化(點(diǎn)號(hào)和兩點(diǎn)號(hào))
protected boolean postParseRequest(org.apache.coyote.Request req, Request request,
org.apache.coyote.Response res, Response response) throws IOException, ServletException {
// 省略部分代碼
MessageBytes decodedURI = req.decodedURI();
if (undecodedURI.getType() == MessageBytes.T_BYTES) {
// Copy the raw URI to the decodedURI
decodedURI.duplicate(undecodedURI);
// Parse the path parameters. This will:
// - strip out the path parameters
// - convert the decodedURI to bytes
parsePathParameters(req, request);
// URI decoding
// %xx decoding of the URL
try {
req.getURLDecoder().convert(decodedURI, false);
} catch (IOException ioe) {
res.setStatus(400);
res.setMessage("Invalid URI: " + ioe.getMessage());
connector.getService().getContainer().logAccess(
request, response, 0, true);
return false;
}
// Normalization
if (!normalize(req.decodedURI())) {
res.setStatus(400);
res.setMessage("Invalid URI");
connector.getService().getContainer().logAccess(
request, response, 0, true);
return false;
}
// Character decoding
convertURI(decodedURI, request);
// Check that the URI is still normalized
if (!checkNormalize(req.decodedURI())) {
res.setStatus(400);
res.setMessage("Invalid URI character encoding");
connector.getService().getContainer().logAccess(
request, response, 0, true);
return false;
}
} else {
/* The URI is chars or String, and has been sent using an in-memory
* protocol handler. The following assumptions are made:
* - req.requestURI() has been set to the 'original' non-decoded,
* non-normalized URI
* - req.decodedURI() has been set to the decoded, normalized form
* of req.requestURI()
*/
decodedURI.toChars();
// Remove all path parameters; any needed path parameter should be set
// using the request object rather than passing it in the URL
CharChunk uriCC = decodedURI.getCharChunk();
int semicolon = uriCC.indexOf(';');
if (semicolon > 0) {
decodedURI.setChars
(uriCC.getBuffer(), uriCC.getStart(), semicolon);
}
}
// Request mapping.
MessageBytes serverName;
if (connector.getUseIPVHosts()) {
serverName = req.localName();
if (serverName.isNull()) {
// well, they did ask for it
res.action(ActionCode.REQ_LOCAL_NAME_ATTRIBUTE, null);
}
} else {
serverName = req.serverName();
}
// Version for the second mapping loop and
// Context that we expect to get for that version
String version = null;
Context versionContext = null;
boolean mapRequired = true;
while (mapRequired) {
// This will map the the latest version by default
connector.getService().getMapper().map(serverName, decodedURI,
version, request.getMappingData());
// 省略部分代碼
}
// 省略部分代碼
}
以MessageBytes的類型是T_BYTES為例:
- parsePathParameters方法去除URI中分號(hào)表示的路徑參數(shù)昂灵;
- req.getURLDecoder()得到一個(gè)UDecoder實(shí)例避凝,它的convert方法對(duì)URI解碼,這里的解碼只是移除百分號(hào)眨补,計(jì)算百分號(hào)后兩位的十六進(jìn)制數(shù)字值以替代原來的三位百分號(hào)編碼管削;
- normalize方法規(guī)格化URI,解釋路徑中的“.”和“..”撑螺;
- convertURI方法利用Connector的uriEncoding屬性將URI的字節(jié)轉(zhuǎn)換為字符表示含思;
- 注意connector.getService().getMapper().map(serverName, decodedURI, version, request.getMappingData()) 這行,之前Service啟動(dòng)時(shí)MapperListener注冊(cè)了該Service內(nèi)的各Host和Context实蓬。根據(jù)URI選擇Context時(shí)茸俭,Mapper的map方法采用的是convertURI方法解碼后的URI與每個(gè)Context的路徑去比較吊履,可參考Mapper類和Context配置文檔安皱。
容器處理
如果請(qǐng)求可以被傳給容器的Pipeline即當(dāng)postParseRequest方法返回true時(shí),則由容器繼續(xù)處理艇炎,在service方法中有connector.getService().getContainer().getPipeline().getFirst().invoke(request, response)這一行:
- Connector調(diào)用getService返回StandardService酌伊;
- StandardService調(diào)用getContainer返回StandardEngine;
- StandardEngine調(diào)用getPipeline返回與其關(guān)聯(lián)的StandardPipeline缀踪;
- 在Tomcat啟動(dòng)分析(七) - 容器組件與Engine組件中曾提到StandardEngine的構(gòu)造函數(shù)為自己的Pipeline添加了基本閥StandardEngineValve居砖,StandardPipeline調(diào)用getFirst得到第一個(gè)閥去處理請(qǐng)求,由于基本閥是最后一個(gè)驴娃,所以最后會(huì)由基本閥去處理請(qǐng)求奏候,后續(xù)處理流程請(qǐng)看下一篇文章。