一,為什么要使用多個線程池?
使用多個線程池,
把相同的任務(wù)放到同一個線程池中冰蘑,
可以起到隔離的作用涌韩,避免有線程出錯時影響到其他線程池,
例如只有一個線程池時潦蝇,
有兩種任務(wù),下單,處理圖片证舟,
如果線程池被處理圖片的任務(wù)占滿,影響下單任務(wù)的進行
說明:劉宏締的架構(gòu)森林是一個專注架構(gòu)的博客窗骑,地址:https://www.cnblogs.com/architectforest
對應(yīng)的源碼可以訪問這里獲扰稹: https://github.com/liuhongdi/
二,演示項目的相關(guān)信息
1,項目地址:
https://github.com/liuhongdi/multithreadpool
2,項目功能說明:
創(chuàng)建了兩個線程池创译,
一個負責(zé)發(fā)郵件抵知,
另一個負責(zé)處理圖片
實際演示中都是sleep
3,項目結(jié)構(gòu):如圖:
三,java代碼說明:
1,ThreadPoolConfig.java
@Configuration
@EnableAsync public class ThreadPoolConfig { //用來生成縮略圖的線程池
@Bean(name = "imageThreadPool") public ThreadPoolTaskExecutor imageThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 設(shè)置核心線程數(shù),它是可以同時被執(zhí)行的線程數(shù)量
executor.setCorePoolSize(2); // 設(shè)置最大線程數(shù),緩沖隊列滿了之后會申請超過核心線程數(shù)的線程
executor.setMaxPoolSize(10); // 設(shè)置緩沖隊列容量,在執(zhí)行任務(wù)之前用于保存任務(wù)
executor.setQueueCapacity(50); // 設(shè)置線程生存時間(秒),當(dāng)超過了核心線程出之外的線程在生存時間到達之后會被銷毀
executor.setKeepAliveSeconds(60); // 設(shè)置線程名稱前綴
executor.setThreadNamePrefix("imagePool-"); // 設(shè)置拒絕策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 等待所有任務(wù)結(jié)束后再關(guān)閉線程池
executor.setWaitForTasksToCompleteOnShutdown(true); //初始化
executor.initialize(); return executor;
} //用來發(fā)郵件的線程池
@Bean(name = "emailThreadPool") public ThreadPoolTaskExecutor emailThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 設(shè)置核心線程數(shù),它是可以同時被執(zhí)行的線程數(shù)量
executor.setCorePoolSize(2); // 設(shè)置最大線程數(shù),緩沖隊列滿了之后會申請超過核心線程數(shù)的線程
executor.setMaxPoolSize(10); // 設(shè)置緩沖隊列容量,在執(zhí)行任務(wù)之前用于保存任務(wù)
executor.setQueueCapacity(50); // 設(shè)置線程生存時間(秒),當(dāng)超過了核心線程出之外的線程在生存時間到達之后會被銷毀
executor.setKeepAliveSeconds(60); // 設(shè)置線程名稱前綴
executor.setThreadNamePrefix("emailPool-"); // 設(shè)置拒絕策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 等待所有任務(wù)結(jié)束后再關(guān)閉線程池
executor.setWaitForTasksToCompleteOnShutdown(true); //初始化
executor.initialize(); return executor;
}
}
說明:配置要使用的線程池软族,按業(yè)務(wù)類型區(qū)分開,
注意命名:一個命名為:emailThreadPool
一個命名為:imageThreadPool
另外注意線程池使用了不同的前綴刷喜,使實際運行時區(qū)分
2,HomeController.java
@RequestMapping("/home")
@Controller public class HomeController {
@Resource private MailService mailService;
@Resource private ImageService imageService;
@Resource private ThreadPoolTaskExecutor imageThreadPool; //監(jiān)控線程池的狀態(tài), //我們得到的數(shù)字,只是大體接近立砸,并不是嚴格的準確數(shù)字
@GetMapping("/poolstatus")
@ResponseBody public String poolstatus() {
String statusStr = ""; int queueSize = imageThreadPool.getThreadPoolExecutor().getQueue().size();
statusStr +="當(dāng)前排隊線程數(shù):" + queueSize; int activeCount = imageThreadPool.getThreadPoolExecutor().getActiveCount();
statusStr +="當(dāng)前活動線程數(shù):" + activeCount; long completedTaskCount = imageThreadPool.getThreadPoolExecutor().getCompletedTaskCount();
statusStr +="執(zhí)行完成線程數(shù):" + completedTaskCount; long taskCount = imageThreadPool.getThreadPoolExecutor().getTaskCount();
statusStr +="總線程數(shù):" + taskCount; return statusStr;
} //異步發(fā)送一封注冊成功的郵件
@GetMapping("/asyncmail")
@ResponseBody public String regMail() {
mailService.sendHtmlMail(); return "mail sended";
} //異步執(zhí)行sleep1秒10次
@GetMapping("/asyncimage")
@ResponseBody public Map<String, Object> asyncsleep() throws ExecutionException, InterruptedException { long start = System.currentTimeMillis();
Map<String, Object> map = new HashMap<>();
List<Future<String>> futures = new ArrayList<>(); for (int i = 0; i < 50; i++) {
Future<String> future = imageService.asynctmb(i);
futures.add(future);
}
List<String> response = new ArrayList<>(); for (Future future : futures) {
String string = (String) future.get();
response.add(string);
}
map.put("data", response);
map.put("消耗時間", String.format("任務(wù)執(zhí)行成功,耗時{%s}毫秒", System.currentTimeMillis() - start)); return map;
}
}
3,MailServiceImpl.java
@Service public class MailServiceImpl implements MailService {
private Logger logger= LoggerFactory.getLogger(MailServiceImpl.class);
@Resource private MailUtil mailUtil; //異步發(fā)送html格式的郵件掖疮,演示時只是sleep1秒
@Async(value="emailThreadPool")
@Override public void sendHtmlMail() {
logger.info("sendHtmlMail begin"); try {
Thread.sleep(2000); //延時1秒
} catch(InterruptedException e) {
e.printStackTrace();
}
}
}
說明:Async注解指定線程池的名字是:emailThreadPool
4,ImageServiceImpl.java
@Service public class ImageServiceImpl implements ImageService { private Logger logger= LoggerFactory.getLogger(MailServiceImpl.class); //演示處理圖片,只是sleep1秒
@Async(value="imageThreadPool")
@Override public Future<String> asynctmb(int i) {
logger.info("asynctmb begin");
String start= TimeUtil.getMilliTimeNow(); try {
Thread.sleep(1000); //延時1秒
} catch(InterruptedException e) {
e.printStackTrace();
} //log.info("async function sleep end");
String end=TimeUtil.getMilliTimeNow(); return new AsyncResult<>(String.format("asynctmb方法,第 %s 個線程:開始時間:%s,結(jié)束時間:%s",i,start,end));
}
}
說明:Async注解指定線程池的名字是:imageThreadPool
四颗祝,測試效果:
1,測試一個線程:訪問:
http://127.0.0.1:8080/home/asyncmail
查看控制臺:
2020-08-10 14:54:35.671 INFO 2570 --- [ emailPool-1] c.m.demo.service.impl.MailServiceImpl : sendHtmlMail begin
可以看到線程的前綴是emailThreadPool的線程的前綴
2,測試多個線程:訪問:
http://127.0.0.1:8080/home/asyncimage
可以看到返回信息:
...
"消耗時間":"任務(wù)執(zhí)行成功,耗時{25052}毫秒"
執(zhí)行時每次并發(fā)的線程數(shù)是2浊闪,一共創(chuàng)建了50個線程,
每個線程sleep用時1秒
所以共用時25秒螺戳,
3,查看線程池狀態(tài):訪問:
http://127.0.0.1:8080/home/asyncimage
同時訪問:
http://127.0.0.1:8080/home/poolstatus
可以看到返回的狀態(tài)信息:
當(dāng)前排隊線程數(shù):44當(dāng)前活動線程數(shù):2執(zhí)行完成線程數(shù):54總線程數(shù):100
說明:ThreadPoolExecutor中的統(tǒng)計信息只是近似值搁宾,不是完全準確的數(shù)字
進階的動態(tài)注冊線程池
@Value("${service.short.names}")
private String serviceShortNames;
@Bean
public void initExecutors() {
ConfigurableApplicationContext applicationContext = (ConfigurableApplicationContext) ApplicationContextHelper.getApplicationContext();
DefaultListableBeanFactory autowireCapableBeanFactory = (DefaultListableBeanFactory) applicationContext.getAutowireCapableBeanFactory();
for (String serviceName : serviceShortNames.split(",")) {
BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.rootBeanDefinition(VisiableThreadPoolTaskExecutor.class);
beanDefinitionBuilder.addPropertyValue("corePoolSize", CORE_POOL_SIZE);
beanDefinitionBuilder.addPropertyValue("maxPoolSize", MAX_POOL_SIZE);
beanDefinitionBuilder.addPropertyValue("queueCapacity", QUEUE_CAPACITY);
beanDefinitionBuilder.addPropertyValue("keepAliveSeconds", KEEP_ALIVE_SECONDS);
beanDefinitionBuilder.addPropertyValue("threadNamePrefix", THREAD_NAME_PREFIX + serviceName + "-");
beanDefinitionBuilder.addPropertyValue("waitForTasksToCompleteOnShutdown", WAIT_FOR_TASKS_TO_COMPLETE_ON_SHUTDOWN);
beanDefinitionBuilder.setLazyInit(false);
String beanName = serviceName + ASYNC_EXECUTOR_SUFFIX;
beanDefinitionBuilder.setScope(ConfigurableBeanFactory.SCOPE_SINGLETON); // 默認單例
autowireCapableBeanFactory.registerBeanDefinition(beanName, beanDefinitionBuilder.getBeanDefinition());
}
}