Springboot之多任務并行+線程池處理

springboot之多任務并行+線程池處理

最近項目中做到一個關于批量發(fā)短信的業(yè)務,如果用戶量特別大的話,不能使用單線程去發(fā)短信舶治,只能嘗試著使用多任務來完成

截圖20190614000017.png
  • Java 線程池
Java通過Executors提供四種線程池,分別為:

newCachedThreadPool創(chuàng)建一個可緩存線程池,如果線程池長度超過處理需要婆排,可靈活回收空閑線程声旺,若無可回收,則新建線程段只。
newFixedThreadPool 創(chuàng)建一個定長線程池腮猖,可控制線程最大并發(fā)數(shù),超出的線程會在隊列中等待赞枕。
newScheduledThreadPool 創(chuàng)建一個定長線程池澈缺,支持定時及周期性任務執(zhí)行。
newSingleThreadExecutor 創(chuàng)建一個單線程化的線程池炕婶,它只會用唯一的工作線程來執(zhí)行任務姐赡,保證所有任務按照指定順序(FIFO, LIFO, 優(yōu)先級)執(zhí)行。

優(yōu)點
重用存在的線程古话,減少對象創(chuàng)建雏吭、消亡的開銷,性能佳陪踩。
可有效控制最大并發(fā)線程數(shù)杖们,提高系統(tǒng)資源的使用率,同時避免過多資源競爭肩狂,避免堵塞摘完。
提供定時執(zhí)行、定期執(zhí)行傻谁、單線程孝治、并發(fā)數(shù)控制等功能。

  • 方式一 CountDownLatch
public class StatsDemo {
    final static SimpleDateFormat sdf = new SimpleDateFormat(
            "yyyy-MM-dd HH:mm:ss");

    final static String startTime = sdf.format(new Date());

    /**
     * IO密集型任務  = 一般為2*CPU核心數(shù)(常出現(xiàn)于線程中:數(shù)據(jù)庫數(shù)據(jù)交互审磁、文件上傳下載谈飒、網(wǎng)絡數(shù)據(jù)傳輸?shù)鹊龋?     * CPU密集型任務 = 一般為CPU核心數(shù)+1(常出現(xiàn)于線程中:復雜算法)
     * 混合型任務  = 視機器配置和復雜度自測而定
     */
    private static int corePoolSize = Runtime.getRuntime().availableProcessors();
    /**
     * public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,
     *                           TimeUnit unit,BlockingQueue<Runnable> workQueue)
     * corePoolSize用于指定核心線程數(shù)量
     * maximumPoolSize指定最大線程數(shù)
     * keepAliveTime和TimeUnit指定線程空閑后的最大存活時間
     * workQueue則是線程池的緩沖隊列,還未執(zhí)行的線程會在隊列中等待
     * 監(jiān)控隊列長度,確保隊列有界
     * 不當?shù)木€程池大小會使得處理速度變慢态蒂,穩(wěn)定性下降杭措,并且導致內(nèi)存泄露。如果配置的線程過少钾恢,則隊列會持續(xù)變大手素,消耗過多內(nèi)存。
     * 而過多的線程又會 由于頻繁的上下文切換導致整個系統(tǒng)的速度變緩——殊途而同歸瘩蚪。隊列的長度至關重要泉懦,它必須得是有界的,這樣如果線程池不堪重負了它可以暫時拒絕掉新的請求疹瘦。
     * ExecutorService 默認的實現(xiàn)是一個無界的 LinkedBlockingQueue崩哩。
     */
    private static ThreadPoolExecutor executor  = new ThreadPoolExecutor(corePoolSize, corePoolSize+1, 10l, TimeUnit.SECONDS,
            new LinkedBlockingQueue<Runnable>(1000));

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(5);
        //使用execute方法
          executor.execute(new Stats("任務A", 1000, latch));
          executor.execute(new Stats("任務B", 1000, latch));
          executor.execute(new Stats("任務C", 1000, latch));
          executor.execute(new Stats("任務D", 1000, latch));
          executor.execute(new Stats("任務E", 1000, latch));
        latch.await();// 等待所有人任務結(jié)束
        System.out.println("所有的統(tǒng)計任務執(zhí)行完成:" + sdf.format(new Date()));
    }

    static class Stats implements Runnable  {
        String statsName;
        int runTime;
        CountDownLatch latch;

        public Stats(String statsName, int runTime, CountDownLatch latch) {
            this.statsName = statsName;
            this.runTime = runTime;
            this.latch = latch;
        }

        public void run() {
            try {
                System.out.println(statsName+ " do stats begin at "+ startTime);
                //模擬任務執(zhí)行時間
                Thread.sleep(runTime);
                System.out.println(statsName + " do stats complete at "+ sdf.format(new Date()));
                latch.countDown();//單次任務結(jié)束,計數(shù)器減一
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

  • 方式二 Future
重點是和springboot整合拱礁,采用注解bean方式生成ThreadPoolTaskExecutor

@Bean

//spring依賴包
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
public class GlobalConfig {

    /**
     * 默認線程池線程池
     *
     * @return Executor
     */
    @Bean
    public ThreadPoolTaskExecutor defaultThreadPool() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //核心線程數(shù)目
        executor.setCorePoolSize(16);
        //指定最大線程數(shù)
        executor.setMaxPoolSize(64);
        //隊列中最大的數(shù)目
        executor.setQueueCapacity(16);
        //線程名稱前綴
        executor.setThreadNamePrefix("defaultThreadPool_");
        //rejection-policy:當pool已經(jīng)達到max size的時候琢锋,如何處理新任務
        //CALLER_RUNS:不在新線程中執(zhí)行任務辕漂,而是由調(diào)用者所在的線程來執(zhí)行
        //對拒絕task的處理策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //線程空閑后的最大存活時間
        executor.setKeepAliveSeconds(60);
        //加載
        executor.initialize();
        return executor;
    }
}

使用

//通過注解引入配置
@Resource(name = "defaultThreadPool")
private ThreadPoolTaskExecutor executor;

    //使用Future方式執(zhí)行多任務
    //生成一個集合
    List<Future> futures = new ArrayList<>();

    //獲取后臺全部有效運營人員的集合
    List<AdminUserMsgResponse> adminUserDOList = adminManagerService.GetUserToSentMsg(null);

    for (AdminUserMsgResponse response : adminUserDOList) {
        //并發(fā)處理
        if (response.getMobile() != null) {
            Future<?> future = executor.submit(() -> {
                //發(fā)送短信
                mobileMessageFacade.sendCustomerMessage(response.getMobile(), msgConfigById.getContent());
            });
            futures.add(future);
        }
    }

  //查詢?nèi)蝿請?zhí)行的結(jié)果
   for (Future<?> future : futureList) {
        while (true) {//CPU高速輪詢:每個future都并發(fā)輪循呢灶,判斷完成狀態(tài)然后獲取結(jié)果吴超,這一行,是本實現(xiàn)方案的精髓所在鸯乃。即有10個future在高速輪詢鲸阻,完成一個future的獲取結(jié)果,就關閉一個輪詢
 if (future.isDone()&& !future.isCancelled()) {//獲取future成功完成狀態(tài)缨睡,如果想要限制每個任務的超時時間鸟悴,取消本行的狀態(tài)判斷+future.get(1000*1, TimeUnit.MILLISECONDS)+catch超時異常使用即可。
     Integer i = future.get();//獲取結(jié)果
    System.out.println("任務i="+i+"獲取完成!"+new Date());
    list.add(i);
    break;//當前future獲取結(jié)果完畢奖年,跳出while
} else {
    Thread.sleep(1);//每次輪詢休息1毫秒(CPU納秒級)细诸,避免CPU高速輪循耗空CPU---》新手別忘記這個
}
}
}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市陋守,隨后出現(xiàn)的幾起案子震贵,更是在濱河造成了極大的恐慌,老刑警劉巖水评,帶你破解...
    沈念sama閱讀 212,718評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件猩系,死亡現(xiàn)場離奇詭異,居然都是意外死亡中燥,警方通過查閱死者的電腦和手機寇甸,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,683評論 3 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來疗涉,“玉大人拿霉,你說我怎么就攤上這事≡劭郏” “怎么了绽淘?”我有些...
    開封第一講書人閱讀 158,207評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長偏窝。 經(jīng)常有香客問我收恢,道長,這世上最難降的妖魔是什么祭往? 我笑而不...
    開封第一講書人閱讀 56,755評論 1 284
  • 正文 為了忘掉前任伦意,我火速辦了婚禮,結(jié)果婚禮上硼补,老公的妹妹穿的比我還像新娘驮肉。我一直安慰自己,他們只是感情好已骇,可當我...
    茶點故事閱讀 65,862評論 6 386
  • 文/花漫 我一把揭開白布离钝。 她就那樣靜靜地躺著票编,像睡著了一般。 火紅的嫁衣襯著肌膚如雪卵渴。 梳的紋絲不亂的頭發(fā)上慧域,一...
    開封第一講書人閱讀 50,050評論 1 291
  • 那天,我揣著相機與錄音浪读,去河邊找鬼昔榴。 笑死,一個胖子當著我的面吹牛碘橘,可吹牛的內(nèi)容都是我干的互订。 我是一名探鬼主播,決...
    沈念sama閱讀 39,136評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼痘拆,長吁一口氣:“原來是場噩夢啊……” “哼仰禽!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起纺蛆,我...
    開封第一講書人閱讀 37,882評論 0 268
  • 序言:老撾萬榮一對情侶失蹤吐葵,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后犹撒,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體折联,經(jīng)...
    沈念sama閱讀 44,330評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,651評論 2 327
  • 正文 我和宋清朗相戀三年识颊,在試婚紗的時候發(fā)現(xiàn)自己被綠了诚镰。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,789評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡祥款,死狀恐怖清笨,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情刃跛,我是刑警寧澤抠艾,帶...
    沈念sama閱讀 34,477評論 4 333
  • 正文 年R本政府宣布,位于F島的核電站桨昙,受9級特大地震影響检号,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜蛙酪,卻給世界環(huán)境...
    茶點故事閱讀 40,135評論 3 317
  • 文/蒙蒙 一齐苛、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧桂塞,春花似錦凹蜂、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,864評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽汰瘫。三九已至,卻和暖如春擂煞,著一層夾襖步出監(jiān)牢的瞬間混弥,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,099評論 1 267
  • 我被黑心中介騙來泰國打工颈娜, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留剑逃,地道東北人浙宜。 一個月前我還...
    沈念sama閱讀 46,598評論 2 362
  • 正文 我出身青樓官辽,卻偏偏與公主長得像,于是被迫代替她去往敵國和親粟瞬。 傳聞我的和親對象是個殘疾皇子同仆,可洞房花燭夜當晚...
    茶點故事閱讀 43,697評論 2 351

推薦閱讀更多精彩內(nèi)容