Springboot線程池多任務(wù)阻塞等待結(jié)果(兩種實(shí)現(xiàn)方式)

場(chǎng)景##

公司一站通翻譯同步系統(tǒng) , 需要將客戶保存在美國(guó)站點(diǎn)的文章翻譯并保存到其余22個(gè)站點(diǎn)的數(shù)據(jù)庫。由于翻譯需要耗費(fèi)較長(zhǎng)的時(shí)間,故而使用隊(duì)列將任務(wù)投遞到線程池中處理呼奢,

  1. Springboot 配置產(chǎn)品同步核心線程池
package cn.configuration;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;

@Configuration
@EnableAsync // 開啟線程池
public class AsyncThreadPoolConfiguration
{
    /**
     * 配置默認(rèn)線程池化戳,用于處理一些公共異步任務(wù)
     */
    @Bean("defaultThread")
    public Executor defaultThread(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(20);// 核心線程數(shù),
        executor.setMaxPoolSize(40);// 并發(fā)線程的數(shù)量限制為2
        executor.setQueueCapacity(200); // 線程隊(duì)列
        executor.setThreadNamePrefix("defaultThread@");
        executor.initialize();
        return executor;
    }

    /**
     * 同步產(chǎn)品使用的線程池
     * @return
     */
    @Bean("syncProduct")
    public Executor syncProduct() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(22);// 核心線程數(shù),
        executor.setMaxPoolSize(22);// 并發(fā)線程的數(shù)量限制為2
        executor.setQueueCapacity(100); // 線程隊(duì)列
        executor.setThreadNamePrefix("syncProduct@");
        executor.initialize();
        return executor;
    }

    /**
     * todo 配置其他功能的線程池
     */
}

2.1 使用方式一 注入線程池對(duì)象 ,通過 lomda表達(dá)式結(jié)合Future實(shí)現(xiàn)

package cn.services;

import cn.utils.SystemUtils;
import cn.utils.TimeUtils;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.*;

@Service
public class IndexServices
{
    // 注入線程池對(duì)象
    @Resource(name = "syncProduct")
    private ThreadPoolTaskExecutor syncProduct;

    public void ansyc() throws InterruptedException {
        // 初始化任務(wù)結(jié)果集
        Map<String,Future<String>> futures = new HashMap<>();
        // 模擬需要同步的 22個(gè)子站點(diǎn)
        List<String> langs = new ArrayList<>();
        for (int i=1;i<=22;i++){
            langs.add("lang"+i);
        }
        // 多線程處理任務(wù)
        for (String lang : langs) {
            Future<String> future = syncProduct.submit(() -> {
                // 真正的業(yè)務(wù)處理
                if(lang.equals("lang10")){
                    Thread.sleep(8000);// 模擬阻塞時(shí)間
                }else{
                    Thread.sleep(3000); // 模擬阻塞時(shí)間
                }

                System.out.println("打印結(jié)果 : " + lang + "當(dāng)前線程名稱是:"+SystemUtils.getThreadID());
                return lang + "...";
            });
            futures.put(lang,future);
        }

        // 阻塞等待結(jié)果单料,缺點(diǎn)該種方式無法設(shè)置阻塞等待有效時(shí)間,如果有一個(gè)線程阻塞,會(huì)導(dǎo)致整個(gè)線程池一直等待
        for (Map.Entry<String, Future<String>> entry : futures.entrySet()) {
           while (true){
               Future<String> future = entry.getValue();
               String lang = entry.getKey();
               if(future.isDone() && !future.isCancelled()){
                   String result = null;
                   try{
                       result = future.get();
                        //result = future.get(6L,TimeUnit.SECONDS); 設(shè)置阻塞等待時(shí)間無效
                   }catch(ExecutionException e){
                       e.printStackTrace();
                   }
                   System.out.println(lang+" 站點(diǎn)任務(wù)結(jié)果result=" + result + "獲取完成!" + TimeUtils.getCurretDate());
                   break;
               }else{
                   Thread.sleep(1);//每次輪詢休息1毫秒(CPU納秒級(jí))点楼,避免CPU高速輪循耗空CPU 扫尖,這個(gè)至關(guān)重要
               }
           }
        }
        /*
       // 設(shè)置阻塞時(shí)間有效版本
        for (Map.Entry<String, Future<String>> entry : futures.entrySet()) {
           while (true){
               Future<String> future = entry.getValue();
               String lang = entry.getKey();
               String result = null;
               try{
                   result = future.get(3L,TimeUnit.SECONDS);
               }catch(ExecutionException e){
                   e.printStackTrace();
               }catch (TimeoutException e){
                   System.out.println("存在超時(shí)的任務(wù),無法獲取結(jié)果");
               }
               System.out.println(lang+" 站點(diǎn)任務(wù)結(jié)果result=" + result + "獲取完成!" + TimeUtils.getCurretDate());
               break;
           }

        }
        */
    }
}

2.2 上線方式二,通過Springboot的Ansyc注解實(shí)現(xiàn)任務(wù)并發(fā)
實(shí)現(xiàn)思路是 寫一個(gè) @Ansyc 注解標(biāo)記的業(yè)務(wù)方法掠廓,在外層循環(huán)調(diào)用换怖,通過參數(shù)控制實(shí)現(xiàn)不同的邏輯

/**
     * 翻譯英文站點(diǎn)文章 并同步到22個(gè)子站點(diǎn)
     */
    // 業(yè)務(wù)核心
    @Async("syncProduceToSite")
    public CompletableFuture<String> syncProduceToSite(String lang) throws InterruptedException {
        // 通過 參數(shù) lang的不同,實(shí)現(xiàn)翻譯成不同的語言蟀瞧,并推送到不同站點(diǎn)的數(shù)據(jù)庫
        Thread.sleep(8000L);

        String results = lang + " success";
        String name =  SystemUtils.getCurrentThreadName();
        return CompletableFuture.completedFuture("站點(diǎn)"+lang + "處理的結(jié)果是:"+results+";線程名稱是:"+name);
    }

在外層調(diào)用過程
 // 模擬準(zhǔn)備 22個(gè)站點(diǎn)的語言標(biāo)識(shí)符參數(shù)
        List<String> langs = new ArrayList<>();
        for (int i=1;i<=22;++i){
            langs.add("lang"+i);
        }
        // 初始化結(jié)果集
        Map<String,CompletableFuture<String>> futureMap = new HashMap<>();
        for (String lang: langs) {
            // 業(yè)務(wù)執(zhí)行
            CompletableFuture<String> future = authServices.syncProduceToSite(lang);
            futureMap.put(lang,future); // 將結(jié)果集放入 map

        }
        // 獲取結(jié)果集
        for (Map.Entry<String, CompletableFuture<String>> entry : futureMap.entrySet()){
            String lang = entry.getKey();
            CompletableFuture<String> future = entry.getValue();

            try {
//                CompletableFuture.anyOf(future).join(); // 阻塞等待結(jié)果的返回,加上這一句沉颂,那么下面的阻塞時(shí)間就無效
                String result= future.get(2L, TimeUnit.SECONDS);
                System.out.println("站點(diǎn):"+lang+" 得到結(jié)果啦 "+result);
            } catch (TimeoutException e) {
//                e.printStackTrace();
                System.out.println("站點(diǎn):"+lang+" 超時(shí),無法獲取結(jié)果");
            }
        }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市悦污,隨后出現(xiàn)的幾起案子铸屉,更是在濱河造成了極大的恐慌,老刑警劉巖切端,帶你破解...
    沈念sama閱讀 217,826評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件彻坛,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡踏枣,警方通過查閱死者的電腦和手機(jī)昌屉,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,968評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來椰于,“玉大人怠益,你說我怎么就攤上這事●觯” “怎么了蜻牢?”我有些...
    開封第一講書人閱讀 164,234評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵烤咧,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我抢呆,道長(zhǎng)煮嫌,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,562評(píng)論 1 293
  • 正文 為了忘掉前任抱虐,我火速辦了婚禮昌阿,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘恳邀。我一直安慰自己懦冰,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,611評(píng)論 6 392
  • 文/花漫 我一把揭開白布谣沸。 她就那樣靜靜地躺著刷钢,像睡著了一般。 火紅的嫁衣襯著肌膚如雪乳附。 梳的紋絲不亂的頭發(fā)上内地,一...
    開封第一講書人閱讀 51,482評(píng)論 1 302
  • 那天,我揣著相機(jī)與錄音赋除,去河邊找鬼阱缓。 笑死,一個(gè)胖子當(dāng)著我的面吹牛举农,可吹牛的內(nèi)容都是我干的荆针。 我是一名探鬼主播,決...
    沈念sama閱讀 40,271評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼并蝗,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼祭犯!你這毒婦竟也來了秸妥?” 一聲冷哼從身側(cè)響起滚停,我...
    開封第一講書人閱讀 39,166評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎粥惧,沒想到半個(gè)月后键畴,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,608評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡突雪,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,814評(píng)論 3 336
  • 正文 我和宋清朗相戀三年起惕,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片咏删。...
    茶點(diǎn)故事閱讀 39,926評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡惹想,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出督函,到底是詐尸還是另有隱情嘀粱,我是刑警寧澤激挪,帶...
    沈念sama閱讀 35,644評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站锋叨,受9級(jí)特大地震影響垄分,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜娃磺,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,249評(píng)論 3 329
  • 文/蒙蒙 一薄湿、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧偷卧,春花似錦豺瘤、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,866評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至蛇更,卻和暖如春瞻赶,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背派任。 一陣腳步聲響...
    開封第一講書人閱讀 32,991評(píng)論 1 269
  • 我被黑心中介騙來泰國(guó)打工砸逊, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人掌逛。 一個(gè)月前我還...
    沈念sama閱讀 48,063評(píng)論 3 370
  • 正文 我出身青樓师逸,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親豆混。 傳聞我的和親對(duì)象是個(gè)殘疾皇子篓像,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,871評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容