SpringBoot中@Scheduled實(shí)現(xiàn)多線(xiàn)程并發(fā)定時(shí)任務(wù)

SpringBoot中@Scheduled實(shí)現(xiàn)多線(xiàn)程并發(fā)定時(shí)任務(wù)

1.背景

  • Spring Boot實(shí)現(xiàn)定時(shí)任務(wù)非常容易钝腺,只需要使用Spring自帶的Schedule注解
@Scheduled(cron = "0 */1 * * * ?")
    public void cancleOrderTask() {
        //實(shí)現(xiàn)業(yè)務(wù)
    }

  • 記得在啟動(dòng)類(lèi)中開(kāi)啟定時(shí)任務(wù)
```
@EnableScheduling //開(kāi)啟定時(shí)任務(wù)

```
  • 定時(shí)任務(wù)開(kāi)啟成功,但所有的任務(wù)都是在同一個(gè)線(xiàn)程池中的同一個(gè)線(xiàn)程來(lái)完成的。在實(shí)際開(kāi)發(fā)過(guò)程中,我們當(dāng)然不希望所有的任務(wù)都運(yùn)行在一個(gè)線(xiàn)程中
[圖片上傳中...(image-75c393-1640765676888-2)]

2.方案解決

方案一:

1:通過(guò)ScheduleConfig配置文件實(shí)現(xiàn)SchedulingConfigurer接口刚梭,并重寫(xiě)setSchedulerfang方法

package com.lds.springbootdemo.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

@Configuration
public class ScheduledConfig implements SchedulingConfigurer {

    @Override
    public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
        scheduledTaskRegistrar.setScheduler(setTaskExecutors());
    }

    @Bean(destroyMethod="shutdown")
    public Executor setTaskExecutors(){
        // 10個(gè)線(xiàn)程來(lái)處理藤为。
        return Executors.newScheduledThreadPool(10);
    }
}

[圖片上傳中...(image-73de4e-1640765676890-3)]

2:創(chuàng)建Bean

package com.example.morningrundata.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

@Configuration
public class TaskSchedulerConfig {
    //線(xiàn)程池應(yīng)該交給容器管理
    @Bean
    public TaskScheduler taskScheduler() {
        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
        scheduler.setPoolSize(10);
        return scheduler;
    }
}

方案二:

1.@Async異步+線(xiàn)程池的兩種方式

  1. 在啟動(dòng)類(lèi)加上@EnableAsync(不一定是啟動(dòng)類(lèi),可以是controller暮顺、service等啟動(dòng)時(shí)加載)
```
package com.example.worktest.async;

@SpringBootApplication
@EnableAsync
public class AsyncApplication {

  public static void main(String[] args) {
      SpringApplication.run(AsyncApplication.class, args);
  }

}

```
  1. @Async注解,可以在類(lèi)秀存,方法捶码,controller,service
```
package com.example.morningrundata.task;

import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;


/**
 * 定時(shí)查詢(xún)學(xué)生晨跑記錄
 * @author Administrator
 */
@Component
@Slf4j
@EnableScheduling
@Async
public class TimerProcessTaskTest {


    @Scheduled(cron = "0/2 * * * * ?")
    public void doTask() throws InterruptedException {
        log.info(Thread.currentThread().getName()+"===task run");
        Thread.sleep(5);

    }
    @Scheduled(cron = "0/2 * * * * ?")
    public void doTask1() throws InterruptedException {
        log.info(Thread.currentThread().getName()+"===task end");
    }
}


```



[圖片上傳中...(image-132da2-1640765676888-1)]
  1. 解釋
> @Async異步方法默認(rèn)使用Spring創(chuàng)建ThreadPoolTaskExecutor(參考TaskExecutionAutoConfiguration),
> 
> 其中默認(rèn)核心線(xiàn)程數(shù)為8, 默認(rèn)最大隊(duì)列和默認(rèn)最大線(xiàn)程數(shù)都是Integer.MAX_VALUE. 創(chuàng)建新線(xiàn)程的條件是隊(duì)列填滿(mǎn)時(shí), 而
> 
> 這樣的配置隊(duì)列永遠(yuǎn)不會(huì)填滿(mǎn), 如果有@Async注解標(biāo)注的方法長(zhǎng)期占用線(xiàn)程(比如HTTP長(zhǎng)連接等待獲取結(jié)果),
> 
> **在核心8個(gè)線(xiàn)程數(shù)占用滿(mǎn)了之后, 新的調(diào)用就會(huì)進(jìn)入隊(duì)列, 外部表現(xiàn)為沒(méi)有執(zhí)行.**
> 
> [圖片上傳中...(image-bf6783-1640765676887-0)]
> 
> ```
> 解決:
> 
>     手動(dòng)配置相應(yīng)屬性即可. 比如
>     spring.task.execution.pool.queueCapacity=4
>     spring.task.execution.pool.coreSize=20
> 
> ```
> 
> ```
> 備注: 
> 
>     此處沒(méi)有配置maxSize, 仍是默認(rèn)的Integer.MAX_VALUE. 如果配置的話(huà), 請(qǐng)考慮達(dá)到最大線(xiàn)程數(shù)時(shí)的處理策略(JUC包查找RejectedExecutionHandler的實(shí)現(xiàn)類(lèi))
> 
>     (默認(rèn)為拒絕執(zhí)行AbortPolicy, 即拋出異常)
> 
>     AbortPolicy: 直接拋出java.util.concurrent.RejectedExecutionException異常
> 
>     CallerRunsPolicy: 主線(xiàn)程直接執(zhí)行該任務(wù)或链,執(zhí)行完之后嘗試添加下一個(gè)任務(wù)到線(xiàn)程池中惫恼,可以有效降低向線(xiàn)程池內(nèi)添加任務(wù)的速度
> 
>     DiscardOldestPolicy: 拋棄舊的任務(wù)
> 
>     DiscardPolicy: 拋棄當(dāng)前任務(wù)
>     
>     //更好的解釋
>     AbortPolicy:直接拋出 RejectedExecutionException 異常并阻止系統(tǒng)正常運(yùn)行。
>     CallerRunsPolicy:“調(diào)用者運(yùn)行”機(jī)制澳盐,該策略既不會(huì)拋棄任務(wù)祈纯,也不會(huì)拋出異常,而是將某些任務(wù)回退到調(diào)用者叼耙,由調(diào)用者來(lái)完成任務(wù)腕窥。
>     DiscardOldestPolicy:拋棄隊(duì)列中等待最久的任務(wù),然后把當(dāng)前任務(wù)加入隊(duì)列中嘗試再次提交當(dāng)前任務(wù)筛婉。
>     DiscarePolicy:直接丟棄任務(wù)簇爆,不予任何處理也不拋出異常。如果允許任務(wù)丟失爽撒,這是最好的一種方案入蛆。
> 
> ```
> 
> ```
> package com.example.morningrundata.config;
> 
> import org.springframework.context.annotation.Configuration;
> import org.springframework.scheduling.annotation.AsyncConfigurer;
> import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
> 
> import java.util.concurrent.Executor;
> import java.util.concurrent.ThreadPoolExecutor;
> 
> @Configuration
> public class TaskExecutorConfig implements AsyncConfigurer {
>     /**
>      * Set the ThreadPoolExecutor's core pool size.
>      */
>     private static final int CORE_POOL_SIZE = 5;
>     /**
>      * Set the ThreadPoolExecutor's maximum pool size.
>      */
>     private static final int MAX_POOL_SIZE = 5;
>     /**
>      * Set the capacity for the ThreadPoolExecutor's BlockingQueue.
>      */
>     private static final int QUEUE_CAPACITY = 1000;
>  
>     /**
>      * 通過(guò)重寫(xiě)getAsyncExecutor方法,制定默認(rèn)的任務(wù)執(zhí)行由該方法產(chǎn)生
>      * <p>
>      * 配置類(lèi)實(shí)現(xiàn)AsyncConfigurer接口并重寫(xiě)getAsyncExcutor方法硕勿,并返回一個(gè)ThreadPoolTaskExevutor
>      * 這樣我們就獲得了一個(gè)基于線(xiàn)程池的TaskExecutor
>      */
>     @Override
>     public Executor getAsyncExecutor() {
>         ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
>         //cpu核數(shù)*2+1
>         taskExecutor.setCorePoolSize(CORE_POOL_SIZE);
>         taskExecutor.setMaxPoolSize(MAX_POOL_SIZE);
>         taskExecutor.setQueueCapacity(QUEUE_CAPACITY);
>         taskExecutor.setThreadNamePrefix("test-");
>         taskExecutor.setKeepAliveSeconds(3);
>         taskExecutor.initialize();
>         //設(shè)置線(xiàn)程池拒絕策略哨毁,四種線(xiàn)程池拒絕策略,具體使用哪種策略源武,還得根據(jù)實(shí)際業(yè)務(wù)場(chǎng)景才能做出抉擇
>         taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
>         return taskExecutor;
>     }
> }
> 
> ```



4.徹徹底底解決Spring中@EnableAsync扼褪、@Async異步調(diào)用的使用、原理及源碼分析源碼解釋如下:http://www.reibang.com/p/5f3bf8a12e26



> 配置文件:
> 
> ```
> #核心線(xiàn)程數(shù)
> spring.task.execution.pool.core-size=200
> #最大線(xiàn)程數(shù)
> spring.task.execution.pool.max-size=1000
> #空閑線(xiàn)程保留時(shí)間
> spring.task.execution.pool.keep-alive=3s
> #隊(duì)列容量
> spring.task.execution.pool.queue-capacity=1000
> #線(xiàn)程名稱(chēng)前綴
> spring.task.execution.thread-name-prefix=test-thread-
> 
> ```
> 
> ```
> spring:
>   profiles:
>     #    active: prod
>     active: test
>     #自用
>   task:
>     execution:
>       pool:
>         core-size: 10 #cpu核數(shù)*2+1
>         keep-alive: 3s
>         max-size: 30
>         queue-capacity: 1000
>       thread-name-prefix: thread-
> 
> ```
> 
> 配置類(lèi)是TaskExecutionProperties【org.springframework.boot.autoconfigure.task.TaskExecutionProperties】

3.springboot的線(xiàn)程池的創(chuàng)建的兩種方法

  1. 使用static代碼塊創(chuàng)建

這樣的方式創(chuàng)建的好處是當(dāng)代碼用到線(xiàn)程池的時(shí)候才會(huì)初始化核心線(xiàn)程數(shù)



```
public class HttpApiThreadPool {
  /** 獲取當(dāng)前系統(tǒng)的CPU 數(shù)目*/
  static int cpuNums = Runtime.getRuntime().availableProcessors();
  /** 線(xiàn)程池核心池的大小*/
  private static int corePoolSize = 10;
  /** 線(xiàn)程池的最大線(xiàn)程數(shù)*/
  private static int maximumPoolSize = cpuNums * 5;
 
  public static ExecutorService httpApiThreadPool = null;
  
  
  /**
   * 靜態(tài)方法
   */
  static{
      System.out.println("創(chuàng)建線(xiàn)程數(shù):"+corePoolSize+",最大線(xiàn)程數(shù):"+maximumPoolSize);
      //建立10個(gè)核心線(xiàn)程粱栖,線(xiàn)程請(qǐng)求個(gè)數(shù)超過(guò)20迎捺,則進(jìn)入隊(duì)列等待
      httpApiThreadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 0L,
              TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(100),new ThreadFactoryBuilder().setNameFormat("PROS-%d").build());
  }
 
}

```



使用方法:



```
  public static void main(String[] args) {
      HttpApiThreadPool.httpApiThreadPool.execute(()->System.out.println("測(cè)試"));
  }



```



**注意:**



1.不能使用**Executors**的方法創(chuàng)建線(xiàn)程池,這個(gè)是大量的生產(chǎn)事故得出來(lái)的結(jié)論



2.maximumPoolSize本程序使用的是cup數(shù)的5倍查排,你可以看你實(shí)際情況用



3.new ThreadFactoryBuilder().setNameFormat(“PROS-%d”).build() 給每個(gè)線(xiàn)程已名字凳枝,可以方便調(diào)試
  1. 使用static代碼塊創(chuàng)建

```
@Configuration
public class TreadPoolConfig {
  private Logger logger = LoggerFactory.getLogger(TreadPoolConfig.class);
  /** 獲取當(dāng)前系統(tǒng)的CPU 數(shù)目*/
  int cpuNums = Runtime.getRuntime().availableProcessors();
  /** 線(xiàn)程池核心池的大小*/
  private  int corePoolSize = 10;
  /** 線(xiàn)程池的最大線(xiàn)程數(shù)*/
  private  int maximumPoolSize = cpuNums * 5;
  
    /**
     * 消費(fèi)隊(duì)列線(xiàn)程
     * @return
     */
    @Bean(value = "httpApiThreadPool")
    public ExecutorService buildHttpApiThreadPool(){
      logger.info("TreadPoolConfig創(chuàng)建線(xiàn)程數(shù):"+corePoolSize+",最大線(xiàn)程數(shù):"+maximumPoolSize);
        ExecutorService pool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 0L,
              TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(100),new ThreadFactoryBuilder().setNameFormat("PROS-%d").build());
 
        return pool ;
    }
 
}

```



使用方法:



```
  //注入
    @Resource
  private TreadPoolConfig treadPoolConfig;
   //調(diào)用 
   public void test() {
      treadPoolConfig.buildHttpApiThreadPool().execute(()->System.out.println("tre"));
  }

```

4.其他創(chuàng)建線(xiàn)程池的方法(沒(méi)有用過(guò))

  1. 推薦方式1:
    首先引入:commons-lang3包
```
  ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
        new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build());

```
  1. 推薦方式 2:
    首先引入:com.google.guava包
```
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
        .setNameFormat("demo-pool-%d").build();
 
    //Common Thread Pool
    ExecutorService pool = new ThreadPoolExecutor(5, 200,
        0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
 
    pool.execute(()-> System.out.println(Thread.currentThread().getName()));
    pool.shutdown();//gracefully shutdown

```
  1. 推薦方式 3:spring配置線(xiàn)程池方式:自定義線(xiàn)程工廠bean需要實(shí)現(xiàn)ThreadFactory,可參考該接口的其它默認(rèn)實(shí)現(xiàn)類(lèi),使用方式直接注入bean
    調(diào)用execute(Runnable task)方法即可
```
<bean id="userThreadPool"
        class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <property name="corePoolSize" value="10" />
        <property name="maxPoolSize" value="100" />
        <property name="queueCapacity" value="2000" />
 
    <property name="threadFactory" value= threadFactory />
        <property name="rejectedExecutionHandler">
            <ref local="rejectedExecutionHandler" />
        </property>
    </bean>
    //in code
    userThreadPool.execute(thread);
```
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末岖瑰,一起剝皮案震驚了整個(gè)濱河市叛买,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌蹋订,老刑警劉巖率挣,帶你破解...
    沈念sama閱讀 212,383評(píng)論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異露戒,居然都是意外死亡椒功,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,522評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門(mén)智什,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)动漾,“玉大人,你說(shuō)我怎么就攤上這事荠锭『得校” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 157,852評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵证九,是天一觀的道長(zhǎng)删豺。 經(jīng)常有香客問(wèn)我,道長(zhǎng)愧怜,這世上最難降的妖魔是什么呀页? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,621評(píng)論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮拥坛,結(jié)果婚禮上蓬蝶,老公的妹妹穿的比我還像新娘。我一直安慰自己渴逻,他們只是感情好疾党,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,741評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布音诫。 她就那樣靜靜地躺著惨奕,像睡著了一般。 火紅的嫁衣襯著肌膚如雪竭钝。 梳的紋絲不亂的頭發(fā)上梨撞,一...
    開(kāi)封第一講書(shū)人閱讀 49,929評(píng)論 1 290
  • 那天,我揣著相機(jī)與錄音香罐,去河邊找鬼卧波。 笑死,一個(gè)胖子當(dāng)著我的面吹牛庇茫,可吹牛的內(nèi)容都是我干的港粱。 我是一名探鬼主播,決...
    沈念sama閱讀 39,076評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼查坪!你這毒婦竟也來(lái)了寸宏?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 37,803評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤偿曙,失蹤者是張志新(化名)和其女友劉穎氮凝,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體望忆,經(jīng)...
    沈念sama閱讀 44,265評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡罩阵,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,582評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了启摄。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片稿壁。...
    茶點(diǎn)故事閱讀 38,716評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖鞋仍,靈堂內(nèi)的尸體忽然破棺而出常摧,到底是詐尸還是另有隱情,我是刑警寧澤威创,帶...
    沈念sama閱讀 34,395評(píng)論 4 333
  • 正文 年R本政府宣布落午,位于F島的核電站,受9級(jí)特大地震影響肚豺,放射性物質(zhì)發(fā)生泄漏溃斋。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,039評(píng)論 3 316
  • 文/蒙蒙 一吸申、第九天 我趴在偏房一處隱蔽的房頂上張望梗劫。 院中可真熱鬧,春花似錦截碴、人聲如沸梳侨。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,798評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)走哺。三九已至,卻和暖如春哲虾,著一層夾襖步出監(jiān)牢的瞬間丙躏,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,027評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工束凑, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留晒旅,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,488評(píng)論 2 361
  • 正文 我出身青樓汪诉,卻偏偏與公主長(zhǎng)得像废恋,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,612評(píng)論 2 350

推薦閱讀更多精彩內(nèi)容