現(xiàn)象描述
突然客戶群里反饋蹋艺,線上某功能處理出現(xiàn)嚴(yán)重?fù)矶隆T偬幚聿缓镁鸵袚Q渠道剥悟。這個(gè)功能就是一個(gè)通知功能,客戶依賴通知結(jié)果去完成他的業(yè)務(wù)邏輯曼库。但是這個(gè)通知非常緩慢区岗,嚴(yán)重?fù)矶隆?/p>
背景描述
常有這樣一個(gè)需求場(chǎng)景,為了提高請(qǐng)求的吞吐量毁枯,在一個(gè)請(qǐng)求鏈路中某些業(yè)務(wù)邏輯是可以異步執(zhí)行慈缔。實(shí)現(xiàn)方式大體上分為兩種:
- 開(kāi)辟單獨(dú)的線程去處理異步邏輯。
- 引入MQ將異步邏輯發(fā)送到MQ种玛,其他服務(wù)接受到消息后處理藐鹤。
本文討論的是第一種情況瓤檐。Spring 提供了一個(gè)注解@Async 作用就是開(kāi)辟獨(dú)立線程去異步處理。但是在不深入了解注解實(shí)現(xiàn)的情況下使用娱节,往往就造成一些問(wèn)題挠蛉。
一個(gè)業(yè)務(wù)系統(tǒng)使用了@Async 實(shí)現(xiàn)了一個(gè)通知功能,于是出現(xiàn)了上述的現(xiàn)象描述括堤。
代碼是這樣的碌秸。
@Retryable(maxAttempts = 3, backoff = @Backoff(delay = 1000 * 5, multiplier = 3), include = CallbackFailException.class)
@Async
public String doCallback(CallBackMessage callBackMessage) {
......do samething
}
@Retryable 這個(gè)注解的作用是完成重試機(jī)制,當(dāng)執(zhí)行過(guò)程中遇到指定異常類型是觸發(fā)重試悄窃,可以指定重試的次數(shù),重試間隔時(shí)間蹂窖。這個(gè)不是本文的重點(diǎn)不做討論轧抗。
@Retryable 和 @Async 一起使用的目的就是異步的完成通知,如果通知失敗觸發(fā)重試機(jī)制瞬测。
問(wèn)題分析
現(xiàn)象是通知出現(xiàn)了積壓横媚,大量通知阻塞。我們來(lái)看@Async的實(shí)現(xiàn)原理月趟。既然需要開(kāi)辟新線程去執(zhí)行灯蝴,我們看Spring 是如果實(shí)現(xiàn)的。如果不自定義異步方法的線程池孝宗,Spring 默認(rèn)使SimpleAsyncTaskExecutor穷躁,但是這個(gè)線程池不是真的線程池,這個(gè)類不重用線程因妇,每次調(diào)用都會(huì)創(chuàng)建一個(gè)新的線程问潭。它會(huì)根據(jù)CPU核心數(shù)設(shè)置一個(gè)最大值,如果超過(guò)這個(gè)值就會(huì)阻塞其他線程婚被。并發(fā)大的時(shí)候會(huì)產(chǎn)生嚴(yán)重的性能問(wèn)題.
相關(guān)源碼:
public void execute(Runnable task, long startTimeout) {
Assert.notNull(task, "Runnable must not be null");
Runnable taskToUse = (this.taskDecorator != null ? this.taskDecorator.decorate(task) : task);
if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {
this.concurrencyThrottle.beforeAccess();
doExecute(new ConcurrencyThrottlingRunnable(taskToUse));
}
else {
doExecute(taskToUse);
}
}
protected void beforeAccess() {
if (this.concurrencyLimit == NO_CONCURRENCY) {
throw new IllegalStateException(
"Currently no invocations allowed - concurrency limit set to NO_CONCURRENCY");
}
if (this.concurrencyLimit > 0) {
boolean debug = logger.isDebugEnabled();
synchronized (this.monitor) {
boolean interrupted = false;
while (this.concurrencyCount >= this.concurrencyLimit) {
if (interrupted) {
throw new IllegalStateException("Thread was interrupted while waiting for invocation access, " +
"but concurrency limit still does not allow for entering");
}
if (debug) {
logger.debug("Concurrency count " + this.concurrencyCount +
" has reached limit " + this.concurrencyLimit + " - blocking");
}
try {
this.monitor.wait();
}
catch (InterruptedException ex) {
// Re-interrupt current thread, to allow other threads to react.
Thread.currentThread().interrupt();
interrupted = true;
}
}
if (debug) {
logger.debug("Entering throttle at concurrency count " + this.concurrencyCount);
}
this.concurrencyCount++;
}
}
}
protected void doExecute(Runnable task) {
Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));
thread.start();
}
如果異步執(zhí)行的業(yè)務(wù)邏輯耗時(shí)較長(zhǎng)狡忙,則會(huì)出現(xiàn)大量的阻塞,這次線上問(wèn)題就是因?yàn)橥ㄖ前l(fā)給第三方系統(tǒng)址芯,請(qǐng)求響應(yīng)超時(shí)時(shí)間設(shè)置過(guò)長(zhǎng)灾茁,恰好部分客戶服務(wù)出現(xiàn)問(wèn)題導(dǎo)致通知返回時(shí)間非常長(zhǎng),觸發(fā)了重試通知谷炸,重試時(shí)又是相同的問(wèn)題北专。導(dǎo)致大量的通知積壓。
解決方案
- 首先要使用自定義的線程池替換默認(rèn)的 SimpleAsyncTaskExecutor 具體如下:
@Configuration
@EnableAsync
public class AppConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(16);
threadPoolTaskExecutor.setMaxPoolSize(32);
threadPoolTaskExecutor.setQueueCapacity(10000);
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new SimpleAsyncUncaughtExceptionHandler();
}
}
這樣@Async就會(huì)使用自定義的線程池淑廊,如果@Async使用很多逗余,還可以定義多個(gè)線程池,然后再指定使用具體的線程池季惩。當(dāng)然你線程池里面可以設(shè)置拒絕的策略录粱,這里就不做討論腻格。
- 其次如果需要異步執(zhí)行的業(yè)務(wù)邏輯非常耗時(shí),不建議使用@Async啥繁,使用MQ去處理菜职。如果異步任務(wù)中需要請(qǐng)求其他的服務(wù),也注意要設(shè)置請(qǐng)求超時(shí)時(shí)間旗闽,以防其他服務(wù)出現(xiàn)異常時(shí)帶崩你的服務(wù)酬核。