tomcat長(zhǎng)連接使用
新建AsyncContext 圾旨,然后將AsyncContext 放入到線程池中執(zhí)行。
AsyncContext asyncContext = request.startAsync();
asyncContext.addListener(new AppAsyncListener());
asyncContext.setTimeout(timeout);
ThreadPoolExecutor executor = (ThreadPoolExecutor) request
.getServletContext().getAttribute("executor");
executor.execute(new AsyncRequestProcessor(asyncContext, workTime));
啟動(dòng)分析
1 獲取AsyncContext
org.apache.catalina.connector.Request#startAsync()
public AsyncContext startAsync(ServletRequest request,
ServletResponse response) {
if (!isAsyncSupported()) {
throw new IllegalStateException(sm.getString("request.asyncNotSupported"));
}
if (asyncContext == null) {
// 新建asyncContext
asyncContext = new AsyncContextImpl(this);
}
//設(shè)置啟動(dòng)狀態(tài)
asyncContext.setStarted(getContext(), request, response,
request==getRequest() && response==getResponse().getResponse());
// 設(shè)置默認(rèn)超時(shí)時(shí)間
asyncContext.setTimeout(getConnector().getAsyncTimeout());
return asyncContext;
}
org.apache.catalina.core.AsyncContextImpl#setStarted
public void setStarted(Context context, ServletRequest request,
ServletResponse response, boolean originalRequestResponse) {
synchronized (asyncContextLock) {
// 重點(diǎn)關(guān)注
this.request.getCoyoteRequest().action(
ActionCode.ASYNC_START, this);
this.context = context;
this.servletRequest = request;
this.servletResponse = response;
this.hasOriginalRequestAndResponse = originalRequestResponse;
this.event = new AsyncEvent(this, request, response);
// 觸發(fā)注冊(cè)的Listener
List<AsyncListenerWrapper> listenersCopy = new ArrayList<>();
listenersCopy.addAll(listeners);
listeners.clear();
for (AsyncListenerWrapper listener : listenersCopy) {
try {
listener.fireOnStartAsync(event);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.warn("onStartAsync() failed for listener of type [" +
listener.getClass().getName() + "]", t);
}
}
}
}
設(shè)置異步狀態(tài)
org.apache.coyote.http11.Http11Processor#action 845行
// 異步啟動(dòng)
case ASYNC_START: {
//設(shè)置異步狀態(tài)為STARTING
asyncStateMachine.asyncStart((AsyncContextCallback) param);
break;
}
// 異步完成峭梳,這個(gè)稍后會(huì)用到
case ASYNC_COMPLETE: {
clearDispatches();
if (asyncStateMachine.asyncComplete()) {
socketWrapper.processSocket(SocketEvent.OPEN_READ, true);
}
break;
}
任務(wù)提交后的后續(xù)處理
從Valve及FilterChain返回后,進(jìn)入org.apache.catalina.connector.CoyoteAdapter#service蹂喻。它最主要作用是不關(guān)閉request和response葱椭。
if (postParseSuccess) {
//check valves if we support async
request.setAsyncSupported(connector.getService().getContainer().getPipeline().isAsyncSupported());
// Calling the container 去調(diào)用Servlet
connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);
}
//設(shè)置異步
if (request.isAsync()) {
async = true;
.....
} else {
request.finishRequest();
response.finishResponse();
}
從CoyoteAdapter返回到Http11Processor
org.apache.coyote.http11.Http11Processor#service
if (getErrorState().isError() || endpoint.isPaused()) {
return SocketState.CLOSED;
} else if (isAsync()) {
// 返回值
return SocketState.LONG;
} else if (isUpgrade()) {
return SocketState.UPGRADING;
} else {
org.apache.coyote.AbstractProtocol.ConnectionHandler#process
if (state == SocketState.LONG) {
// In the middle of processing a request/response. Keep the
// socket associated with the processor. Exact requirements
// depend on type of long poll
longPoll(wrapper, processor);
// 將Processor加入到等待處理隊(duì)列
if (processor.isAsync()) {
getProtocol().addWaitingProcessor(processor);
}
}
任務(wù)完成
任務(wù)完成時(shí)需要主動(dòng)調(diào)用
asyncContext.complete();
org.apache.catalina.core.AsyncContextImpl#complete
public void complete() {
if (log.isDebugEnabled()) {
logDebug("complete ");
}
check();
//這個(gè)和啟動(dòng)的時(shí)候類似
request.getCoyoteRequest().action(ActionCode.ASYNC_COMPLETE, null);
}
action觸發(fā)socketWrapper.processSocket(SocketEvent.OPEN_READ, true);
org.apache.tomcat.util.net.AbstractEndpoint#processSocket
觸發(fā)processor處理請(qǐng)求
public boolean processSocket(SocketWrapperBase<S> socketWrapper,
SocketEvent event, boolean dispatch) {
try {
if (socketWrapper == null) {
return false;
}
SocketProcessorBase<S> sc = processorCache.pop();
if (sc == null) {
sc = createSocketProcessor(socketWrapper, event);
} else {
sc.reset(socketWrapper, event);
}
Executor executor = getExecutor();
if (dispatch && executor != null) {
executor.execute(sc);
} else {
sc.run();
}
}
超時(shí)處理
超時(shí)處理線程
org.apache.coyote.AbstractProtocol.AsyncTimeout
while (asyncTimeoutRunning) {
// 每隔一秒掃描一次
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
long now = System.currentTimeMillis();
// 從waitingProcessors去除Processor
for (Processor processor : waitingProcessors) {
processor.timeoutAsync(now);
}
while (endpoint.isPaused() && asyncTimeoutRunning) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
}
addWaitingProcessor來(lái)源
org.apache.coyote.AbstractProtocol#addWaitingProcessor
// 在servlet返回中調(diào)用的
public void addWaitingProcessor(Processor processor) {
waitingProcessors.add(processor);
}
判斷是否超時(shí)
org.apache.coyote.AbstractProcessor#timeoutAsync
public void timeoutAsync(long now) {
if (now < 0) {
doTimeoutAsync();
} else {
long asyncTimeout = getAsyncTimeout();
if (asyncTimeout > 0) {
long asyncStart = asyncStateMachine.getLastAsyncStart();
if ((now - asyncStart) > asyncTimeout) {
//觸發(fā)超時(shí)處理变勇,可以在這一行打斷點(diǎn)
doTimeoutAsync();
}
}
}
}
參考
https://zhuanlan.zhihu.com/p/22018499
http://www.importnew.com/8864.html
http://blog.csdn.net/wangyangzhizhou/article/details/53207966