背景
為了提高tomcat線程利用率焦读,避免tomcat連接被打滿增大吞吐量贮缅,準(zhǔn)備在項(xiàng)目中集成Servlet3.0異步請求寄症。
相關(guān)技術(shù)
- 異步請求的基石:Servlet3.0 async servlet技術(shù)
- Spring相關(guān)封裝:@Async古徒,Callable,DeferedResult
Async注解
這個注解容易混淆停蕉,其實(shí)他跟 async servlet 沒什么關(guān)系愕鼓。 被 @Async 注解的方法會由 Spring 使用 TaskExecutor 在新線程中異步執(zhí)行。
Callable谷徙,DeferedResult
這兩個才是 Spring 中實(shí)現(xiàn)異步請求的關(guān)鍵拒啰。
@ResponseBody 注解的方法如果返回 Callable 或 DeferedResult ,Spring 會自動替我們將這個請求轉(zhuǎn)換為異步請求完慧。Callable 和 DeferedResult 比較相似谋旦,不過 DeferedResult 更為強(qiáng)大,Callable 是異步執(zhí)行返回結(jié)果屈尼,而 DeferedResult 更為靈活册着,甚至可以在另外一個請求中放置響應(yīng)結(jié)果,類似與閉包脾歧。
- 如果是簡單的使用場景甲捏,Controller 直接返回 DeferedResult 就可以變成一個異步接口。
- Spring 處理異步請求的核心類是 RequestMappingHandlerAdapter鞭执, 一個容器級的過濾器司顿,想深入研究去追這個類就行。
問題
??Spring 異步請求默認(rèn)不會使用線程池兄纺,雖然經(jīng)過配置優(yōu)化(AsyncSupportConfigurer)可以使用大溜,但這個線程池是整個服務(wù)共享的,不能做到針對接口進(jìn)行線程池隔離估脆,避免量大請求影響量小請求钦奋。
??為了達(dá)到接口級線程隔離的目的,只能自己封裝 Servlet API 來實(shí)現(xiàn)疙赠。大致處理流程如下:
在 Controller 層進(jìn)行 AOP 攔截付材,開啟線程池執(zhí)行切面邏輯
request.startAsync();
threadPool.execute(()->{
...
runController...
writeResponse...
request.complete();
...
})
return null;
??本來這套邏輯跑著挺好,然而當(dāng)集成 Micrometer 進(jìn)行 Http 請求監(jiān)控的時候圃阳,Controller 中的接口只有在 400 狀態(tài)時才被記錄 metrics厌衔。查看 Micrometer 攔截 Http 的代碼:
io.micrometer.spring.web.servlet.WebMvcMetricsFilter.java
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException {
HandlerExecutionChain handler = null;
try {
MatchableHandlerMapping matchableHandlerMapping = mappingIntrospector.getMatchableHandlerMapping(request);
if (matchableHandlerMapping != null) {
handler = matchableHandlerMapping.getHandler(request);
}
} catch (Exception e) {
logger.debug("Unable to time request", e);
filterChain.doFilter(request, response);
return;
}
final Object handlerObject = handler == null ? null : handler.getHandler();
// If this is the second invocation of the filter in an async request, we don't
// want to start sampling again (effectively bumping the active count on any long task timers).
// Rather, we'll just use the sampling context we started on the first invocation.
TimingSampleContext timingContext = (TimingSampleContext) request.getAttribute(TIMING_SAMPLE);
if (timingContext == null) {
timingContext = new TimingSampleContext(request, handlerObject);
}
try {
filterChain.doFilter(request, response);
if (request.isAsyncSupported()) {
// this won't be "started" until after the first call to doFilter
if (request.isAsyncStarted()) {
request.setAttribute(TIMING_SAMPLE, timingContext);
}
}
if (!request.isAsyncStarted()) {
record(timingContext, response, request,
handlerObject, (Throwable) request.getAttribute(DispatcherServlet.EXCEPTION_ATTRIBUTE));
}
} catch (NestedServletException e) {
response.setStatus(HttpStatus.INTERNAL_SERVER_ERROR.value());
record(timingContext, response, request, handlerObject, e.getCause());
throw e;
}
}
??從代碼中可以看到,如果開啟了異步捍岳,micrometer 并不會記錄此次請求富寿,這是出于什么原因呢? 注意注釋中寫了: If this is the second invocation of the filter in an async request祟同,也就是說一個異步請求會被 WebMvcMetricsFilter 攔截兩次作喘,第一次是異步的,而第二次不是晕城,所以會在第二次被攔截的時候記錄這個異步請求泞坦。
??為了驗(yàn)證這個說法,clone 了 micrometer 的 simple 運(yùn)行砖顷。結(jié)果一個異步請求還真是會被WebMvcMetricsFilter 攔截兩次贰锁,第一次異步赃梧,第二次異步并未打開。
順著 RequestMappingHandlerAdapter 找到異步處理核心類:
org.springframework.web.context.request.async.WebAsyncManager.java
public void startCallableProcessing(final WebAsyncTask<?> webAsyncTask, Object... processingContext)
throws Exception {
[...]
try {
Future<?> future = this.taskExecutor.submit(() -> {
Object result = null;
try {
interceptorChain.applyPreProcess(this.asyncWebRequest, callable);
result = callable.call();
}
catch (Throwable ex) {
result = ex;
}
finally {
result = interceptorChain.applyPostProcess(this.asyncWebRequest, callable, result);
}
setConcurrentResultAndDispatch(result);
});
interceptorChain.setTaskFuture(future);
}
catch (RejectedExecutionException ex) {
Object result = interceptorChain.applyPostProcess(this.asyncWebRequest, callable, ex);
setConcurrentResultAndDispatch(result);
throw ex;
}
[...]
}
private void setConcurrentResultAndDispatch(Object result) {
synchronized (WebAsyncManager.this) {
if (this.concurrentResult != RESULT_NONE) {
return;
}
this.concurrentResult = result;
}
[...]
this.asyncWebRequest.dispatch();
}
??原來如此豌熄,Spring 并沒有使用 asyncContext.complete() 來結(jié)束這個異步請求授嘀,而是使用 dispatch(),dispatch() 中也會把這個異步請求標(biāo)記成 Completed 狀態(tài)锣险,所以 WebMvcMetricsFilter 會攔截兩次蹄皱。那就照著這個吧 asyncContext.complete() 換成 asyncContext.dispatch()。
??然而芯肤,事與愿違巷折,雖然代碼一樣,但表現(xiàn)完全不一樣崖咨,asyncContext.complete() 換成 asyncContext.dispatch()后并沒有攔截兩次锻拘,而且這個請求會循環(huán)調(diào)用下去,形成一個死循環(huán)击蹲。仔細(xì)分析后發(fā)現(xiàn)問題:
- Spring DeferedResult 處理流程:
request -> RequestHandlerMappingAdaptor 攔截 -> WebMvcMetricsFilter 攔截(is async) -> dispatch() -> RequestHandlerMappingAdaptor 攔截 -> WebMvcMetricsFilter 攔截(not async) - AOP 封裝 Servlet API的處理流程:
request -> AOP攔截 -> startAsync() -> RequestHandlerMappingAdaptor 攔截 -> WebMvcMetricsFilter 攔截(is async) -> dispatch() -> AOP攔截 -> startAsync() -> RequestHandlerMappingAdaptor 攔截 -> WebMvcMetricsFilter 攔截(is async) -> dispatch() -> ....
就是由于多了AOP的攔截署拟,dispatch 之后依然會開啟新的異步請求,導(dǎo)致在這里死循環(huán)歌豺⊥魄睿看來必須要標(biāo)記請求的執(zhí)行狀態(tài),參考 DeferedResult 的處理世曾,
@Slf4j
public class CustomerAsyncWebRequest extends StandardServletAsyncWebRequest {
private final List<Runnable> errorHandlers = new ArrayList<Runnable>();
public static final String ASYNC_COMPLETED = "com.xx.async.ASYNC_COMPLETED";
/**
* Create a new instance for the given request/response pair.
*
* @param request current HTTP request
* @param response current HTTP response
*/
public CustomerAsyncWebRequest(HttpServletRequest request, HttpServletResponse response) {
super(request, response);
}
@Override
public void onComplete(AsyncEvent event) throws IOException {
super.onComplete(event);
this.getRequest().setAttribute(ASYNC_COMPLETED, true);
}
// 這個在Spring5.x中已經(jīng)支持
@Override
public void onError(AsyncEvent event) throws IOException {
log.error("AsyncListener onError ", event.getThrowable().getMessage());
this.errorHandlers.forEach(Runnable::run);
}
// 這個在Spring5.x中已經(jīng)支持
public void addErrorHandler(Runnable errorHandler){
this.errorHandlers.add(errorHandler);
}
}
AOP邏輯變?yōu)?/p>
if(customerRequest.isAsyncComplete()){
return;
}
request.startAsync();
threadPool.execute(()->{
...
runController...
writeResponse...
// 這里看著像多余缨恒,但是由于AsyncListener的存在谴咸,這個判斷是必不可少的
if(customerRequest.isAsyncComplete()){
return;
}
customerRequest.onComplete(null);
asyncContext.dispatch();
...
})
return null;
至此轮听,改造完畢。所以在使用 micrometer 進(jìn)行監(jiān)控的時候岭佳,如果自己使用 Servlet API 實(shí)現(xiàn)異步請求血巍,不能使用 complete() 來結(jié)束,必須使用 dispatch() 結(jié)束請求珊随,micrometer 才會記錄述寡。