springBoot結(jié)合線程池解決多線程問題實(shí)錄

背景:

線上有一個(gè)接口沸停,3臺機(jī)器總共QPS在3000左右,單機(jī)QPS在1000左右疾宏,接口響應(yīng)時(shí)間2ms张足。為了保證接口的任何改動在上線之前能夠在大流量下能夠沒有問題。提出想法坎藐,搭建一套流量回放環(huán)境为牍,上線之前把代碼先部署到流量回放環(huán)境。然后將線上的流量導(dǎo)入到流量回放環(huán)境岩馍,用真實(shí)的業(yè)務(wù)請求來做模擬測試碉咆,這個(gè)過程我們稱作是流量回放。

? 為了保證流量回放的時(shí)候蛀恩,流量導(dǎo)入過程吟逝,不能影響正常的線上接口請求,也就是響應(yīng)時(shí)間不能有變化赦肋。首先就要考慮啟動一個(gè)線程來異步處理這個(gè)事情块攒。好,按照這個(gè)想法佃乘,寫了第一版本代碼囱井。(以下代碼是有問題的,只怪我too young too sample)

第一版本代碼寫起來:

@RequestMapping(value = "/flags", method = RequestMethod.POST)
@ResponseBody
public ServerResponse getFlagsPost(@RequestBody NewServerParam param) {
    //如果流量回放開關(guān)打開趣避,就創(chuàng)建線程庞呕,將請求發(fā)送到流量回放環(huán)境
    if(Constant.TRAFFIC_REPLAY_FLAG){
        Runnable r = ()->{
            HttpClientUtil.post(trafficReplayUrl+"sample/v2/flags", JSON.toJSONString(param),10000,"application/json");
        };
        Thread thread = new Thread(r);
        thread.start();
    }
    long a = System.currentTimeMillis();
    ServerResponse response = new ServerResponse();
    response.setFlags(serverService.getFlags(param, param.getExp_key()));
    long b = System.currentTimeMillis();
    LOGGER.info(logService.parseToJSon(param, response, (b - a), LOGGER, Level.INFO.getName()));
    return response;
}

線下測試沒有問題,一上線程帕,大流量上來住练,服務(wù)器瞬間報(bào)錯(cuò)上萬條,馬上回滾愁拭。

報(bào)錯(cuò)提示:

? java.lang.OutOfMemoryError: Unable to create new native thread

這個(gè)錯(cuò)誤的意思是:程序創(chuàng)建的線程數(shù)量已達(dá)到上限值

剖析錯(cuò)誤

? JVM向操作系統(tǒng)申請創(chuàng)建新的 native thread(原生線程)時(shí), 就有可能會碰到 java.lang.OutOfMemoryError: Unable to create new native thread 錯(cuò)誤. 如果底層操作系統(tǒng)創(chuàng)建新的 native thread 失敗, JVM就會拋出相應(yīng)的OutOfMemoryError. 原生線程的數(shù)量受到具體環(huán)境的限制, 通過一些測試用例可以找出這些限制, 請參考下文的示例. 但總體來說, 導(dǎo)致 java.lang.OutOfMemoryError: Unable to create new native thread 錯(cuò)誤的場景大多經(jīng)歷以下這些階段:

  1. java程序向jvm申請創(chuàng)建一個(gè)線程

  2. jvm本地代碼(native code)代理該請求讲逛,嘗試創(chuàng)建一個(gè)操作系統(tǒng)級別的native Thread(原生線程)

  3. 操作系統(tǒng)嘗試創(chuàng)建一個(gè)新的native Thread,需要同時(shí)分配一些內(nèi)存給該線程

  4. 如果操作系統(tǒng)的虛擬內(nèi)存已經(jīng)耗盡岭埠,或者受到32位進(jìn)程的地址空間限制(約2-4GB)盏混,OS就會拒絕本地內(nèi)存分配

  5. JVM拋出 java.lang.OutOfMemoryError: Unable to create new native thread 錯(cuò)誤。

    參考:https://blog.csdn.net/renfufei/article/details/78088553

改進(jìn)方案-springBoot整合線程池優(yōu)化

? 錯(cuò)誤很明顯惜论,就是創(chuàng)建線程數(shù)量過多许赃,超過OS所能允許的最大空間。那這個(gè)問題馆类,完全就可以用線程池去解決混聊,用線程池維護(hù)一定數(shù)量的線程,防止無限制的創(chuàng)建線程乾巧,帶來的內(nèi)存開銷過大句喜。

代碼改進(jìn):

? 1. 創(chuàng)建線程池配置類

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;


/**
 * @description: 線程池配置類
 **/
@Configuration  //表示這個(gè)類是配置類
@EnableAsync    //表示這個(gè)類是線程池配置類
public class ExecutorConfig {

    private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class);
    @Bean
    public Executor asyncServiceExecutor(){
        logger.info("start asyncServiceExecutor");
        ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
        int corePoolSize = Runtime.getRuntime().availableProcessors();
        //配置核心線程數(shù)
        executor.setCorePoolSize(corePoolSize*2);
        //配置最大線程數(shù)
        executor.setMaxPoolSize(corePoolSize*2);
        //配置隊(duì)列大小
        executor.setQueueCapacity(99999);
        //配置線程池中的線程的名稱前綴
        executor.setThreadNamePrefix("async-service-");
        // rejection-policy:當(dāng)pool已經(jīng)達(dá)到max size的時(shí)候僵闯,并且隊(duì)列已經(jīng)滿了,如何處理新任務(wù)
        // CALLER_RUNS:不在新線程中執(zhí)行任務(wù)藤滥,而是有調(diào)用者所在的線程來執(zhí)行
        //DiscardPolicy: 直接丟棄
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        //執(zhí)行初始化
        executor.initialize();
        // 等待所有任務(wù)結(jié)束后再關(guān)閉線程池
        executor.setWaitForTasksToCompleteOnShutdown(true);
        return executor;
    }

}

? 2.創(chuàng)建線程信息打印的類鳖粟,這樣在執(zhí)行線程池執(zhí)行excute方法的時(shí)候,會把當(dāng)前的任務(wù)的情況打印出來

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;

import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * @description: 獲取線程池的監(jiān)控信息
 **/
public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {

    private static final Logger logger = LoggerFactory.getLogger(VisiableThreadPoolTaskExecutor.class);

    private void showThreadPoolInfo(String prefix){
        ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
        if(null == threadPoolExecutor){
            return;
        }
        logger.info("{},{},taskCount[{}],completedTaskCount[{}],activeCount[{}],queuesize[{}]",
                this.getThreadNamePrefix(),
                prefix,
                threadPoolExecutor.getTaskCount(),
                threadPoolExecutor.getCompletedTaskCount(),
                threadPoolExecutor.getActiveCount(),
                threadPoolExecutor.getQueue().size()
                );
    }

    @Override
    public void execute(Runnable task) {
        showThreadPoolInfo("1. do execute");
        super.execute(task);
    }

    @Override
    public void execute(Runnable task, long startTimeout) {
        showThreadPoolInfo("2. do execute");
        super.execute(task, startTimeout);
    }

    @Override
    public Future<?> submit(Runnable task) {
        showThreadPoolInfo("1. do submit");
        return super.submit(task);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        showThreadPoolInfo("2. do submit");
        return super.submit(task);
    }

    @Override
    public ListenableFuture<?> submitListenable(Runnable task) {
        showThreadPoolInfo("1. do submitListenable");
        return super.submitListenable(task);
    }

    @Override
    public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
        showThreadPoolInfo("2. do submitListenable");
        return super.submitListenable(task);
    }


}
  1. 創(chuàng)建任務(wù)接口
```java
import com.alibaba.fastjson.JSONObject;

/**
 * 異步任務(wù)接口
 */
public interface AsyncService {

    void trfficRepalyForFlagV2(String param);

}
```
  1. 創(chuàng)建任務(wù)實(shí)現(xiàn)類

    ``

    import com.alibaba.fastjson.JSONObject;
    import com.lianjia.platform.sampling.util.HttpClientUtil;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.scheduling.annotation.Async;
    import org.springframework.stereotype.Service;
    
    
    /**
     * @description: 異步任務(wù)實(shí)現(xiàn)類
     **/
    @Service
    public class AsyncServiceImpl implements AsyncService {
        private final Logger logger = LoggerFactory.getLogger(AsyncServiceImpl.class);
    
        @Value("${URL.trafficReplayUrl}")
        private String trafficReplayUrl; //獲取yml文件中配置的流量回放環(huán)境的URL
    
        @Override
        @Async("asyncServiceExecutor")//這里要使用定義的線程池配置的Bean的方法名
        public void trfficRepalyForFlagV2(String param) {
            HttpClientUtil.post(trafficReplayUrl+"sample/v2/flags", param,10000,"application/json");
        }
    
    }
    
  2. 使用任務(wù)

```java
@RequestMapping(value = "/flags", method = RequestMethod.POST)
@ResponseBody
public ServerResponse getFlagsPost(@RequestBody NewServerParam param) {
    //如果流量回放開關(guān)打開拙绊,就創(chuàng)建線程向图,將請求發(fā)送到流量回放環(huán)境
    if(Constant.TRAFFIC_REPLAY_FLAG){
        asyncService.trfficRepalyForFlagV2(JSON.toJSONString(param));
    }
    long a = System.currentTimeMillis();
    ServerResponse response = new ServerResponse();
    response.setFlags(serverService.getFlags(param, param.getExp_key()));
    long b = System.currentTimeMillis();
    LOGGER.info(logService.parseToJSon(param, response, (b - a), LOGGER, Level.INFO.getName()));
    return response;
}
```

壓測結(jié)果:

? 壓測數(shù)據(jù):并發(fā)數(shù)50,壓測時(shí)間10min标沪。并發(fā)數(shù)=QPS(1000)*響應(yīng)時(shí)間(0.02s)榄攀。

? 之前因?yàn)樯暇€之前沒有做壓測,導(dǎo)致了上線之后金句,大流量下報(bào)錯(cuò)檩赢。吃一塹,長一智违寞。這次改完之后做了壓測贞瞒。壓測之后,第一趁曼,打開流量開關(guān)之后军浆,不報(bào)錯(cuò)了;第二挡闰,主線程平均響應(yīng)耗時(shí)和不開啟異步任務(wù)時(shí)候的平均響應(yīng)耗時(shí)基本一致乒融。證明方案是可以的。

插曲:拒絕策略使用不當(dāng)導(dǎo)致主線程平均響應(yīng)時(shí)間非常大摄悯。第一次在寫線程池配置類的時(shí)候赞季,使用了executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());的拒絕策略。隊(duì)列的大小設(shè)置了3000奢驯,在壓測的時(shí)候發(fā)現(xiàn)并發(fā)數(shù)50(一般計(jì)算是并發(fā)數(shù)=QPS * 響應(yīng)時(shí)間)左右申钩,這個(gè)和線上單臺機(jī)器的QPS基本接近,雖然不報(bào)錯(cuò)了主線程的接口耗時(shí)遠(yuǎn)遠(yuǎn)超出了不打開開關(guān)的接口耗時(shí)叨橱。通過打印信息來看典蜕,是因?yàn)樘峤坏娜蝿?wù)量非常大,隊(duì)列中的任務(wù)已經(jīng)把隊(duì)列填滿了罗洗,這個(gè)時(shí)候,從線程池原理來看钢猛,要去創(chuàng)建線程數(shù)達(dá)到maxpoolsize伙菜,我們這里設(shè)置的maxPoolsize和corePoolsize大小是一樣的。意味著就不會再去創(chuàng)建線程了命迈,只能走拒絕策略贩绕。這里的拒絕策略CallerRunsPolicy的含義是如果異步線程執(zhí)行不了火的,就由調(diào)用線程處理,實(shí)際上就是主線程來處理淑倾,這樣就會導(dǎo)致主線程的部分流量回放任務(wù)成了同步的了馏鹤。這當(dāng)然會增大主線程的接口響應(yīng)時(shí)間了。因?yàn)槲覀冎恍枰胁糠志€上流量了其實(shí)就可以了娇哆,因此湃累,我把拒絕策略改為了直接丟棄。這樣處理不了的線程不進(jìn)入隊(duì)列碍讨,也不由主線程執(zhí)行治力,保證主線程的響應(yīng)時(shí)間不受影響。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末勃黍,一起剝皮案震驚了整個(gè)濱河市宵统,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌覆获,老刑警劉巖马澈,帶你破解...
    沈念sama閱讀 219,039評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異弄息,居然都是意外死亡箭券,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,426評論 3 395
  • 文/潘曉璐 我一進(jìn)店門疑枯,熙熙樓的掌柜王于貴愁眉苦臉地迎上來辩块,“玉大人,你說我怎么就攤上這事荆永》贤ぃ” “怎么了?”我有些...
    開封第一講書人閱讀 165,417評論 0 356
  • 文/不壞的土叔 我叫張陵具钥,是天一觀的道長豆村。 經(jīng)常有香客問我,道長骂删,這世上最難降的妖魔是什么掌动? 我笑而不...
    開封第一講書人閱讀 58,868評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮宁玫,結(jié)果婚禮上粗恢,老公的妹妹穿的比我還像新娘。我一直安慰自己欧瘪,他們只是感情好眷射,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,892評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著,像睡著了一般妖碉。 火紅的嫁衣襯著肌膚如雪涌庭。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,692評論 1 305
  • 那天欧宜,我揣著相機(jī)與錄音坐榆,去河邊找鬼。 笑死冗茸,一個(gè)胖子當(dāng)著我的面吹牛席镀,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播蚀狰,決...
    沈念sama閱讀 40,416評論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼愉昆,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了麻蹋?” 一聲冷哼從身側(cè)響起跛溉,我...
    開封第一講書人閱讀 39,326評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎扮授,沒想到半個(gè)月后芳室,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,782評論 1 316
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡刹勃,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,957評論 3 337
  • 正文 我和宋清朗相戀三年堪侯,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片荔仁。...
    茶點(diǎn)故事閱讀 40,102評論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡伍宦,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出乏梁,到底是詐尸還是另有隱情次洼,我是刑警寧澤,帶...
    沈念sama閱讀 35,790評論 5 346
  • 正文 年R本政府宣布遇骑,位于F島的核電站卖毁,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏落萎。R本人自食惡果不足惜亥啦,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,442評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望练链。 院中可真熱鬧翔脱,春花似錦、人聲如沸兑宇。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,996評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽隶糕。三九已至瓷产,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間枚驻,已是汗流浹背濒旦。 一陣腳步聲響...
    開封第一講書人閱讀 33,113評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留再登,地道東北人尔邓。 一個(gè)月前我還...
    沈念sama閱讀 48,332評論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像锉矢,于是被迫代替她去往敵國和親梯嗽。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,044評論 2 355