SpringBoot中@Scheduled實(shí)現(xiàn)多線(xiàn)程并發(fā)定時(shí)任務(wù)
1.背景
- Spring Boot實(shí)現(xiàn)定時(shí)任務(wù)非常容易钝腺,只需要使用Spring自帶的Schedule注解
@Scheduled(cron = "0 */1 * * * ?") public void cancleOrderTask() { //實(shí)現(xiàn)業(yè)務(wù) }
- 記得在啟動(dòng)類(lèi)中開(kāi)啟定時(shí)任務(wù)
``` @EnableScheduling //開(kāi)啟定時(shí)任務(wù) ```
- 定時(shí)任務(wù)開(kāi)啟成功,但所有的任務(wù)都是在同一個(gè)線(xiàn)程池中的同一個(gè)線(xiàn)程來(lái)完成的。在實(shí)際開(kāi)發(fā)過(guò)程中,我們當(dāng)然不希望所有的任務(wù)都運(yùn)行在一個(gè)線(xiàn)程中
[圖片上傳中...(image-75c393-1640765676888-2)]
2.方案解決
方案一:
1:通過(guò)ScheduleConfig配置文件實(shí)現(xiàn)SchedulingConfigurer接口刚梭,并重寫(xiě)setSchedulerfang方法
package com.lds.springbootdemo.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.annotation.SchedulingConfigurer; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.scheduling.config.ScheduledTaskRegistrar; import java.util.concurrent.Executor; import java.util.concurrent.Executors; @Configuration public class ScheduledConfig implements SchedulingConfigurer { @Override public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) { scheduledTaskRegistrar.setScheduler(setTaskExecutors()); } @Bean(destroyMethod="shutdown") public Executor setTaskExecutors(){ // 10個(gè)線(xiàn)程來(lái)處理藤为。 return Executors.newScheduledThreadPool(10); } }
[圖片上傳中...(image-73de4e-1640765676890-3)]
2:創(chuàng)建Bean
package com.example.morningrundata.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @Configuration public class TaskSchedulerConfig { //線(xiàn)程池應(yīng)該交給容器管理 @Bean public TaskScheduler taskScheduler() { ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); scheduler.setPoolSize(10); return scheduler; } }
方案二:
1.@Async異步+線(xiàn)程池的兩種方式
- 在啟動(dòng)類(lèi)加上@EnableAsync(不一定是啟動(dòng)類(lèi),可以是controller暮顺、service等啟動(dòng)時(shí)加載)
``` package com.example.worktest.async; @SpringBootApplication @EnableAsync public class AsyncApplication { public static void main(String[] args) { SpringApplication.run(AsyncApplication.class, args); } } ```
- @Async注解,可以在類(lèi)秀存,方法捶码,controller,service
``` package com.example.morningrundata.task; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; /** * 定時(shí)查詢(xún)學(xué)生晨跑記錄 * @author Administrator */ @Component @Slf4j @EnableScheduling @Async public class TimerProcessTaskTest { @Scheduled(cron = "0/2 * * * * ?") public void doTask() throws InterruptedException { log.info(Thread.currentThread().getName()+"===task run"); Thread.sleep(5); } @Scheduled(cron = "0/2 * * * * ?") public void doTask1() throws InterruptedException { log.info(Thread.currentThread().getName()+"===task end"); } } ``` [圖片上傳中...(image-132da2-1640765676888-1)]
- 解釋
> @Async異步方法默認(rèn)使用Spring創(chuàng)建ThreadPoolTaskExecutor(參考TaskExecutionAutoConfiguration), > > 其中默認(rèn)核心線(xiàn)程數(shù)為8, 默認(rèn)最大隊(duì)列和默認(rèn)最大線(xiàn)程數(shù)都是Integer.MAX_VALUE. 創(chuàng)建新線(xiàn)程的條件是隊(duì)列填滿(mǎn)時(shí), 而 > > 這樣的配置隊(duì)列永遠(yuǎn)不會(huì)填滿(mǎn), 如果有@Async注解標(biāo)注的方法長(zhǎng)期占用線(xiàn)程(比如HTTP長(zhǎng)連接等待獲取結(jié)果), > > **在核心8個(gè)線(xiàn)程數(shù)占用滿(mǎn)了之后, 新的調(diào)用就會(huì)進(jìn)入隊(duì)列, 外部表現(xiàn)為沒(méi)有執(zhí)行.** > > [圖片上傳中...(image-bf6783-1640765676887-0)] > > ``` > 解決: > > 手動(dòng)配置相應(yīng)屬性即可. 比如 > spring.task.execution.pool.queueCapacity=4 > spring.task.execution.pool.coreSize=20 > > ``` > > ``` > 備注: > > 此處沒(méi)有配置maxSize, 仍是默認(rèn)的Integer.MAX_VALUE. 如果配置的話(huà), 請(qǐng)考慮達(dá)到最大線(xiàn)程數(shù)時(shí)的處理策略(JUC包查找RejectedExecutionHandler的實(shí)現(xiàn)類(lèi)) > > (默認(rèn)為拒絕執(zhí)行AbortPolicy, 即拋出異常) > > AbortPolicy: 直接拋出java.util.concurrent.RejectedExecutionException異常 > > CallerRunsPolicy: 主線(xiàn)程直接執(zhí)行該任務(wù)或链,執(zhí)行完之后嘗試添加下一個(gè)任務(wù)到線(xiàn)程池中惫恼,可以有效降低向線(xiàn)程池內(nèi)添加任務(wù)的速度 > > DiscardOldestPolicy: 拋棄舊的任務(wù) > > DiscardPolicy: 拋棄當(dāng)前任務(wù) > > //更好的解釋 > AbortPolicy:直接拋出 RejectedExecutionException 異常并阻止系統(tǒng)正常運(yùn)行。 > CallerRunsPolicy:“調(diào)用者運(yùn)行”機(jī)制澳盐,該策略既不會(huì)拋棄任務(wù)祈纯,也不會(huì)拋出異常,而是將某些任務(wù)回退到調(diào)用者叼耙,由調(diào)用者來(lái)完成任務(wù)腕窥。 > DiscardOldestPolicy:拋棄隊(duì)列中等待最久的任務(wù),然后把當(dāng)前任務(wù)加入隊(duì)列中嘗試再次提交當(dāng)前任務(wù)筛婉。 > DiscarePolicy:直接丟棄任務(wù)簇爆,不予任何處理也不拋出異常。如果允許任務(wù)丟失爽撒,這是最好的一種方案入蛆。 > > ``` > > ``` > package com.example.morningrundata.config; > > import org.springframework.context.annotation.Configuration; > import org.springframework.scheduling.annotation.AsyncConfigurer; > import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; > > import java.util.concurrent.Executor; > import java.util.concurrent.ThreadPoolExecutor; > > @Configuration > public class TaskExecutorConfig implements AsyncConfigurer { > /** > * Set the ThreadPoolExecutor's core pool size. > */ > private static final int CORE_POOL_SIZE = 5; > /** > * Set the ThreadPoolExecutor's maximum pool size. > */ > private static final int MAX_POOL_SIZE = 5; > /** > * Set the capacity for the ThreadPoolExecutor's BlockingQueue. > */ > private static final int QUEUE_CAPACITY = 1000; > > /** > * 通過(guò)重寫(xiě)getAsyncExecutor方法,制定默認(rèn)的任務(wù)執(zhí)行由該方法產(chǎn)生 > * <p> > * 配置類(lèi)實(shí)現(xiàn)AsyncConfigurer接口并重寫(xiě)getAsyncExcutor方法硕勿,并返回一個(gè)ThreadPoolTaskExevutor > * 這樣我們就獲得了一個(gè)基于線(xiàn)程池的TaskExecutor > */ > @Override > public Executor getAsyncExecutor() { > ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); > //cpu核數(shù)*2+1 > taskExecutor.setCorePoolSize(CORE_POOL_SIZE); > taskExecutor.setMaxPoolSize(MAX_POOL_SIZE); > taskExecutor.setQueueCapacity(QUEUE_CAPACITY); > taskExecutor.setThreadNamePrefix("test-"); > taskExecutor.setKeepAliveSeconds(3); > taskExecutor.initialize(); > //設(shè)置線(xiàn)程池拒絕策略哨毁,四種線(xiàn)程池拒絕策略,具體使用哪種策略源武,還得根據(jù)實(shí)際業(yè)務(wù)場(chǎng)景才能做出抉擇 > taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); > return taskExecutor; > } > } > > ``` 4.徹徹底底解決Spring中@EnableAsync扼褪、@Async異步調(diào)用的使用、原理及源碼分析源碼解釋如下:http://www.reibang.com/p/5f3bf8a12e26 > 配置文件: > > ``` > #核心線(xiàn)程數(shù) > spring.task.execution.pool.core-size=200 > #最大線(xiàn)程數(shù) > spring.task.execution.pool.max-size=1000 > #空閑線(xiàn)程保留時(shí)間 > spring.task.execution.pool.keep-alive=3s > #隊(duì)列容量 > spring.task.execution.pool.queue-capacity=1000 > #線(xiàn)程名稱(chēng)前綴 > spring.task.execution.thread-name-prefix=test-thread- > > ``` > > ``` > spring: > profiles: > # active: prod > active: test > #自用 > task: > execution: > pool: > core-size: 10 #cpu核數(shù)*2+1 > keep-alive: 3s > max-size: 30 > queue-capacity: 1000 > thread-name-prefix: thread- > > ``` > > 配置類(lèi)是TaskExecutionProperties【org.springframework.boot.autoconfigure.task.TaskExecutionProperties】
3.springboot的線(xiàn)程池的創(chuàng)建的兩種方法
使用static代碼塊創(chuàng)建
這樣的方式創(chuàng)建的好處是當(dāng)代碼用到線(xiàn)程池的時(shí)候才會(huì)初始化核心線(xiàn)程數(shù) ``` public class HttpApiThreadPool { /** 獲取當(dāng)前系統(tǒng)的CPU 數(shù)目*/ static int cpuNums = Runtime.getRuntime().availableProcessors(); /** 線(xiàn)程池核心池的大小*/ private static int corePoolSize = 10; /** 線(xiàn)程池的最大線(xiàn)程數(shù)*/ private static int maximumPoolSize = cpuNums * 5; public static ExecutorService httpApiThreadPool = null; /** * 靜態(tài)方法 */ static{ System.out.println("創(chuàng)建線(xiàn)程數(shù):"+corePoolSize+",最大線(xiàn)程數(shù):"+maximumPoolSize); //建立10個(gè)核心線(xiàn)程粱栖,線(xiàn)程請(qǐng)求個(gè)數(shù)超過(guò)20迎捺,則進(jìn)入隊(duì)列等待 httpApiThreadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(100),new ThreadFactoryBuilder().setNameFormat("PROS-%d").build()); } } ``` 使用方法: ``` public static void main(String[] args) { HttpApiThreadPool.httpApiThreadPool.execute(()->System.out.println("測(cè)試")); } ``` **注意:** 1.不能使用**Executors**的方法創(chuàng)建線(xiàn)程池,這個(gè)是大量的生產(chǎn)事故得出來(lái)的結(jié)論 2.maximumPoolSize本程序使用的是cup數(shù)的5倍查排,你可以看你實(shí)際情況用 3.new ThreadFactoryBuilder().setNameFormat(“PROS-%d”).build() 給每個(gè)線(xiàn)程已名字凳枝,可以方便調(diào)試
使用static代碼塊創(chuàng)建
``` @Configuration public class TreadPoolConfig { private Logger logger = LoggerFactory.getLogger(TreadPoolConfig.class); /** 獲取當(dāng)前系統(tǒng)的CPU 數(shù)目*/ int cpuNums = Runtime.getRuntime().availableProcessors(); /** 線(xiàn)程池核心池的大小*/ private int corePoolSize = 10; /** 線(xiàn)程池的最大線(xiàn)程數(shù)*/ private int maximumPoolSize = cpuNums * 5; /** * 消費(fèi)隊(duì)列線(xiàn)程 * @return */ @Bean(value = "httpApiThreadPool") public ExecutorService buildHttpApiThreadPool(){ logger.info("TreadPoolConfig創(chuàng)建線(xiàn)程數(shù):"+corePoolSize+",最大線(xiàn)程數(shù):"+maximumPoolSize); ExecutorService pool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(100),new ThreadFactoryBuilder().setNameFormat("PROS-%d").build()); return pool ; } } ``` 使用方法: ``` //注入 @Resource private TreadPoolConfig treadPoolConfig; //調(diào)用 public void test() { treadPoolConfig.buildHttpApiThreadPool().execute(()->System.out.println("tre")); } ```
4.其他創(chuàng)建線(xiàn)程池的方法(沒(méi)有用過(guò))
- 推薦方式1:
首先引入:commons-lang3包``` ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build()); ```
- 推薦方式 2:
首先引入:com.google.guava包``` ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("demo-pool-%d").build(); //Common Thread Pool ExecutorService pool = new ThreadPoolExecutor(5, 200, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); pool.execute(()-> System.out.println(Thread.currentThread().getName())); pool.shutdown();//gracefully shutdown ```
- 推薦方式 3:spring配置線(xiàn)程池方式:自定義線(xiàn)程工廠bean需要實(shí)現(xiàn)ThreadFactory,可參考該接口的其它默認(rèn)實(shí)現(xiàn)類(lèi),使用方式直接注入bean
調(diào)用execute(Runnable task)方法即可``` <bean id="userThreadPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <property name="corePoolSize" value="10" /> <property name="maxPoolSize" value="100" /> <property name="queueCapacity" value="2000" /> <property name="threadFactory" value= threadFactory /> <property name="rejectedExecutionHandler"> <ref local="rejectedExecutionHandler" /> </property> </bean> //in code userThreadPool.execute(thread); ```