背景:
線上有一個(gè)接口沸停,3臺機(jī)器總共QPS在3000左右,單機(jī)QPS在1000左右疾宏,接口響應(yīng)時(shí)間2ms张足。為了保證接口的任何改動在上線之前能夠在大流量下能夠沒有問題。提出想法坎藐,搭建一套流量回放環(huán)境为牍,上線之前把代碼先部署到流量回放環(huán)境。然后將線上的流量導(dǎo)入到流量回放環(huán)境岩馍,用真實(shí)的業(yè)務(wù)請求來做模擬測試碉咆,這個(gè)過程我們稱作是流量回放。
? 為了保證流量回放的時(shí)候蛀恩,流量導(dǎo)入過程吟逝,不能影響正常的線上接口請求,也就是響應(yīng)時(shí)間不能有變化赦肋。首先就要考慮啟動一個(gè)線程來異步處理這個(gè)事情块攒。好,按照這個(gè)想法佃乘,寫了第一版本代碼囱井。(以下代碼是有問題的,只怪我too young too sample)
第一版本代碼寫起來:
@RequestMapping(value = "/flags", method = RequestMethod.POST)
@ResponseBody
public ServerResponse getFlagsPost(@RequestBody NewServerParam param) {
//如果流量回放開關(guān)打開趣避,就創(chuàng)建線程庞呕,將請求發(fā)送到流量回放環(huán)境
if(Constant.TRAFFIC_REPLAY_FLAG){
Runnable r = ()->{
HttpClientUtil.post(trafficReplayUrl+"sample/v2/flags", JSON.toJSONString(param),10000,"application/json");
};
Thread thread = new Thread(r);
thread.start();
}
long a = System.currentTimeMillis();
ServerResponse response = new ServerResponse();
response.setFlags(serverService.getFlags(param, param.getExp_key()));
long b = System.currentTimeMillis();
LOGGER.info(logService.parseToJSon(param, response, (b - a), LOGGER, Level.INFO.getName()));
return response;
}
線下測試沒有問題,一上線程帕,大流量上來住练,服務(wù)器瞬間報(bào)錯(cuò)上萬條,馬上回滾愁拭。
報(bào)錯(cuò)提示:
? java.lang.OutOfMemoryError: Unable to create new native thread
這個(gè)錯(cuò)誤的意思是:程序創(chuàng)建的線程數(shù)量已達(dá)到上限值
剖析錯(cuò)誤
? JVM向操作系統(tǒng)申請創(chuàng)建新的 native thread(原生線程)時(shí), 就有可能會碰到 java.lang.OutOfMemoryError: Unable to create new native thread 錯(cuò)誤. 如果底層操作系統(tǒng)創(chuàng)建新的 native thread 失敗, JVM就會拋出相應(yīng)的OutOfMemoryError. 原生線程的數(shù)量受到具體環(huán)境的限制, 通過一些測試用例可以找出這些限制, 請參考下文的示例. 但總體來說, 導(dǎo)致 java.lang.OutOfMemoryError: Unable to create new native thread 錯(cuò)誤的場景大多經(jīng)歷以下這些階段:
java程序向jvm申請創(chuàng)建一個(gè)線程
jvm本地代碼(native code)代理該請求讲逛,嘗試創(chuàng)建一個(gè)操作系統(tǒng)級別的native Thread(原生線程)
操作系統(tǒng)嘗試創(chuàng)建一個(gè)新的native Thread,需要同時(shí)分配一些內(nèi)存給該線程
如果操作系統(tǒng)的虛擬內(nèi)存已經(jīng)耗盡岭埠,或者受到32位進(jìn)程的地址空間限制(約2-4GB)盏混,OS就會拒絕本地內(nèi)存分配
-
JVM拋出 java.lang.OutOfMemoryError: Unable to create new native thread 錯(cuò)誤。
改進(jìn)方案-springBoot整合線程池優(yōu)化
? 錯(cuò)誤很明顯惜论,就是創(chuàng)建線程數(shù)量過多许赃,超過OS所能允許的最大空間。那這個(gè)問題馆类,完全就可以用線程池去解決混聊,用線程池維護(hù)一定數(shù)量的線程,防止無限制的創(chuàng)建線程乾巧,帶來的內(nèi)存開銷過大句喜。
代碼改進(jìn):
? 1. 創(chuàng)建線程池配置類
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @description: 線程池配置類
**/
@Configuration //表示這個(gè)類是配置類
@EnableAsync //表示這個(gè)類是線程池配置類
public class ExecutorConfig {
private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class);
@Bean
public Executor asyncServiceExecutor(){
logger.info("start asyncServiceExecutor");
ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
int corePoolSize = Runtime.getRuntime().availableProcessors();
//配置核心線程數(shù)
executor.setCorePoolSize(corePoolSize*2);
//配置最大線程數(shù)
executor.setMaxPoolSize(corePoolSize*2);
//配置隊(duì)列大小
executor.setQueueCapacity(99999);
//配置線程池中的線程的名稱前綴
executor.setThreadNamePrefix("async-service-");
// rejection-policy:當(dāng)pool已經(jīng)達(dá)到max size的時(shí)候僵闯,并且隊(duì)列已經(jīng)滿了,如何處理新任務(wù)
// CALLER_RUNS:不在新線程中執(zhí)行任務(wù)藤滥,而是有調(diào)用者所在的線程來執(zhí)行
//DiscardPolicy: 直接丟棄
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
//執(zhí)行初始化
executor.initialize();
// 等待所有任務(wù)結(jié)束后再關(guān)閉線程池
executor.setWaitForTasksToCompleteOnShutdown(true);
return executor;
}
}
? 2.創(chuàng)建線程信息打印的類鳖粟,這樣在執(zhí)行線程池執(zhí)行excute方法的時(shí)候,會把當(dāng)前的任務(wù)的情況打印出來
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @description: 獲取線程池的監(jiān)控信息
**/
public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
private static final Logger logger = LoggerFactory.getLogger(VisiableThreadPoolTaskExecutor.class);
private void showThreadPoolInfo(String prefix){
ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
if(null == threadPoolExecutor){
return;
}
logger.info("{},{},taskCount[{}],completedTaskCount[{}],activeCount[{}],queuesize[{}]",
this.getThreadNamePrefix(),
prefix,
threadPoolExecutor.getTaskCount(),
threadPoolExecutor.getCompletedTaskCount(),
threadPoolExecutor.getActiveCount(),
threadPoolExecutor.getQueue().size()
);
}
@Override
public void execute(Runnable task) {
showThreadPoolInfo("1. do execute");
super.execute(task);
}
@Override
public void execute(Runnable task, long startTimeout) {
showThreadPoolInfo("2. do execute");
super.execute(task, startTimeout);
}
@Override
public Future<?> submit(Runnable task) {
showThreadPoolInfo("1. do submit");
return super.submit(task);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
showThreadPoolInfo("2. do submit");
return super.submit(task);
}
@Override
public ListenableFuture<?> submitListenable(Runnable task) {
showThreadPoolInfo("1. do submitListenable");
return super.submitListenable(task);
}
@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
showThreadPoolInfo("2. do submitListenable");
return super.submitListenable(task);
}
}
- 創(chuàng)建任務(wù)接口
```java
import com.alibaba.fastjson.JSONObject;
/**
* 異步任務(wù)接口
*/
public interface AsyncService {
void trfficRepalyForFlagV2(String param);
}
```
-
創(chuàng)建任務(wù)實(shí)現(xiàn)類
``
import com.alibaba.fastjson.JSONObject; import com.lianjia.platform.sampling.util.HttpClientUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; /** * @description: 異步任務(wù)實(shí)現(xiàn)類 **/ @Service public class AsyncServiceImpl implements AsyncService { private final Logger logger = LoggerFactory.getLogger(AsyncServiceImpl.class); @Value("${URL.trafficReplayUrl}") private String trafficReplayUrl; //獲取yml文件中配置的流量回放環(huán)境的URL @Override @Async("asyncServiceExecutor")//這里要使用定義的線程池配置的Bean的方法名 public void trfficRepalyForFlagV2(String param) { HttpClientUtil.post(trafficReplayUrl+"sample/v2/flags", param,10000,"application/json"); } }
使用任務(wù)
```java
@RequestMapping(value = "/flags", method = RequestMethod.POST)
@ResponseBody
public ServerResponse getFlagsPost(@RequestBody NewServerParam param) {
//如果流量回放開關(guān)打開拙绊,就創(chuàng)建線程向图,將請求發(fā)送到流量回放環(huán)境
if(Constant.TRAFFIC_REPLAY_FLAG){
asyncService.trfficRepalyForFlagV2(JSON.toJSONString(param));
}
long a = System.currentTimeMillis();
ServerResponse response = new ServerResponse();
response.setFlags(serverService.getFlags(param, param.getExp_key()));
long b = System.currentTimeMillis();
LOGGER.info(logService.parseToJSon(param, response, (b - a), LOGGER, Level.INFO.getName()));
return response;
}
```
壓測結(jié)果:
? 壓測數(shù)據(jù):并發(fā)數(shù)50,壓測時(shí)間10min标沪。并發(fā)數(shù)=QPS(1000)*響應(yīng)時(shí)間(0.02s)榄攀。
? 之前因?yàn)樯暇€之前沒有做壓測,導(dǎo)致了上線之后金句,大流量下報(bào)錯(cuò)檩赢。吃一塹,長一智违寞。這次改完之后做了壓測贞瞒。壓測之后,第一趁曼,打開流量開關(guān)之后军浆,不報(bào)錯(cuò)了;第二挡闰,主線程平均響應(yīng)耗時(shí)和不開啟異步任務(wù)時(shí)候的平均響應(yīng)耗時(shí)基本一致乒融。證明方案是可以的。
插曲:拒絕策略使用不當(dāng)導(dǎo)致主線程平均響應(yīng)時(shí)間非常大摄悯。第一次在寫線程池配置類的時(shí)候赞季,使用了executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());的拒絕策略。隊(duì)列的大小設(shè)置了3000奢驯,在壓測的時(shí)候發(fā)現(xiàn)并發(fā)數(shù)50(一般計(jì)算是并發(fā)數(shù)=QPS * 響應(yīng)時(shí)間)左右申钩,這個(gè)和線上單臺機(jī)器的QPS基本接近,雖然不報(bào)錯(cuò)了主線程的接口耗時(shí)遠(yuǎn)遠(yuǎn)超出了不打開開關(guān)的接口耗時(shí)叨橱。通過打印信息來看典蜕,是因?yàn)樘峤坏娜蝿?wù)量非常大,隊(duì)列中的任務(wù)已經(jīng)把隊(duì)列填滿了罗洗,這個(gè)時(shí)候,從線程池原理來看钢猛,要去創(chuàng)建線程數(shù)達(dá)到maxpoolsize伙菜,我們這里設(shè)置的maxPoolsize和corePoolsize大小是一樣的。意味著就不會再去創(chuàng)建線程了命迈,只能走拒絕策略贩绕。這里的拒絕策略CallerRunsPolicy的含義是如果異步線程執(zhí)行不了火的,就由調(diào)用線程處理,實(shí)際上就是主線程來處理淑倾,這樣就會導(dǎo)致主線程的部分流量回放任務(wù)成了同步的了馏鹤。這當(dāng)然會增大主線程的接口響應(yīng)時(shí)間了。因?yàn)槲覀冎恍枰胁糠志€上流量了其實(shí)就可以了娇哆,因此湃累,我把拒絕策略改為了直接丟棄。這樣處理不了的線程不進(jìn)入隊(duì)列碍讨,也不由主線程執(zhí)行治力,保證主線程的響應(yīng)時(shí)間不受影響。