在項(xiàng)目中修赞,當(dāng)訪問(wèn)其他人的接口較慢或者做耗時(shí)任務(wù)時(shí)远舅,不想程序一直卡在耗時(shí)任務(wù)上缚忧,想程序能夠并行執(zhí)行,我們可以使用多線程來(lái)并行的處理任務(wù)专钉,這里介紹下 SpringBoot 下的 @Async 注解挑童,還有 ApplicationEventPublisher 可以了解下
代碼地址
- Github: https://github.com/dolyw/ProjectStudy/tree/master/SpringBoot/AsyncDemo
- Gitee(碼云): https://gitee.com/dolyw/ProjectStudy/tree/master/SpringBoot/AsyncDemo
1. Config
需要一個(gè)注解 @EnableAsync 開啟 @Async 的功能,SpringBoot 可以放在 Application 上跃须,也可以放其他配置文件上
@EnableAsync
@SpringBootApplication
public class Application { }
@Async 配置有兩個(gè)站叼,一個(gè)是執(zhí)行的線程池,一個(gè)是異常處理
執(zhí)行的線程池默認(rèn)情況下找唯一的 org.springframework.core.task.TaskExecutor菇民,或者一個(gè) Bean 的 Name 為 taskExecutor 的 java.util.concurrent.Executor 作為執(zhí)行任務(wù)的線程池尽楔。如果都沒(méi)有的話,會(huì)創(chuàng)建 SimpleAsyncTaskExecutor 線程池來(lái)處理異步方法調(diào)用第练,當(dāng)然 @Async 注解支持一個(gè) String 參數(shù)阔馋,來(lái)指定一個(gè) Bean 的 Name,類型是 Executor 或 TaskExecutor娇掏,表示使用這個(gè)指定的線程池來(lái)執(zhí)行這個(gè)異步任務(wù)
異常處理呕寝,@Async 標(biāo)記的方法只能是 void 或者 Future 返回值,在無(wú)返回值的異步調(diào)用中婴梧,異步處理拋出異常下梢,默認(rèn)是SimpleAsyncUncaughtExceptionHandler 的 handleUncaughtException() 會(huì)捕獲指定異常,只是簡(jiǎn)單的輸出了錯(cuò)誤日志(一般需要自定義配置異常處理)塞蹭,原有任務(wù)還會(huì)繼續(xù)運(yùn)行孽江,直到結(jié)束(具有 void 返回類型的方法不能將任何異常發(fā)送回調(diào)用者,默認(rèn)情況下此類未捕獲異常只會(huì)輸出錯(cuò)誤日志)浮还,而在有返回值的異步調(diào)用中竟坛,異步處理拋出了異常,會(huì)直接返回主線程處理钧舌,異步任務(wù)結(jié)束執(zhí)行担汤,主線程也會(huì)被異步方法中的異常中斷結(jié)束執(zhí)行
@Async有兩個(gè)使用的限制
- 它必須僅適用于 public 方法
- 在同一個(gè)類中調(diào)用異步方法將無(wú)法正常工作(self-invocation)
/**
* AsyncConfig
*
* @author wliduo[i@dolyw.com]
* @date 2020/5/19 17:58
*/
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
/**
* logger
*/
private static final Logger logger = LoggerFactory.getLogger(AsyncConfig.class);
/**
* 這里不實(shí)現(xiàn)了,使用 ThreadPoolConfig 里的線程池即可
*
* @param
* @return java.util.concurrent.Executor
* @throws
* @author wliduo[i@dolyw.com]
* @date 2020/5/19 18:00
*/
/*@Override
public Executor getAsyncExecutor() {
return null;
}*/
/**
* 只能捕獲無(wú)返回值的異步方法洼冻,有返回值的被主線程處理
*
* @param
* @return org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler
* @throws
* @author wliduo[i@dolyw.com]
* @date 2020/5/20 10:16
*/
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new CustomAsyncExceptionHandler();
}
/***
* 處理異步方法中未捕獲的異常
*
* @author wliduo[i@dolyw.com]
* @date 2020/5/19 19:16
*/
class CustomAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
@Override
public void handleUncaughtException(Throwable throwable, Method method, Object... obj) {
logger.info("Exception message - {}", throwable.getMessage());
logger.info("Method name - {}", method.getName());
logger.info("Parameter values - {}", Arrays.toString(obj));
if (throwable instanceof Exception) {
Exception exception = (Exception) throwable;
logger.info("exception:{}", exception.getMessage());
}
throwable.printStackTrace();
}
}
}
/**
* 線程池配置
*
* @author wliduo
* @date 2019/2/15 14:36
*/
@Configuration
public class ThreadPoolConfig {
/**
* logger
*/
private final static Logger logger = LoggerFactory.getLogger(ThreadPoolConfig.class);
@Value("${asyncThreadPool.corePoolSize}")
private int corePoolSize;
@Value("${asyncThreadPool.maxPoolSize}")
private int maxPoolSize;
@Value("${asyncThreadPool.queueCapacity}")
private int queueCapacity;
@Value("${asyncThreadPool.keepAliveSeconds}")
private int keepAliveSeconds;
@Value("${asyncThreadPool.awaitTerminationSeconds}")
private int awaitTerminationSeconds;
@Value("${asyncThreadPool.threadNamePrefix}")
private String threadNamePrefix;
/**
* 線程池配置
* @param
* @return java.util.concurrent.Executor
* @author wliduo
* @date 2019/2/15 14:44
*/
@Bean(name = "threadPoolTaskExecutor")
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
logger.info("---------- 線程池開始加載 ----------");
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
// 核心線程池大小
threadPoolTaskExecutor.setCorePoolSize(corePoolSize);
// 最大線程數(shù)
threadPoolTaskExecutor.setMaxPoolSize(maxPoolSize);
// 隊(duì)列容量
threadPoolTaskExecutor.setQueueCapacity(queueCapacity);
// 活躍時(shí)間
threadPoolTaskExecutor.setKeepAliveSeconds(keepAliveSeconds);
// 主線程等待子線程執(zhí)行時(shí)間
threadPoolTaskExecutor.setAwaitTerminationSeconds(awaitTerminationSeconds);
// 線程名字前綴
threadPoolTaskExecutor.setThreadNamePrefix(threadNamePrefix);
// RejectedExecutionHandler:當(dāng)pool已經(jīng)達(dá)到max-size的時(shí)候崭歧,如何處理新任務(wù)
// CallerRunsPolicy:不在新線程中執(zhí)行任務(wù),而是由調(diào)用者所在的線程來(lái)執(zhí)行
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 初始化
threadPoolTaskExecutor.initialize();
logger.info("---------- 線程池加載完成 ----------");
return threadPoolTaskExecutor;
}
}
2. Code
簡(jiǎn)單的使用撞牢,異步異常處理率碾,Service 方法異步,多個(gè) Service 方法異步屋彪,工具類異步
2.1. Controller
/**
* AsyncController
*
* @author wliduo[i@dolyw.com]
* @date 2020/5/19 14:46
*/
@RestController
@RequestMapping("/async")
public class AsyncController {
/**
* logger
*/
private final static Logger logger = LoggerFactory.getLogger(AsyncController.class);
@Autowired
private AsyncService asyncService;
@Autowired
private SmsUtil smsUtil;
/**
* 可以看到無(wú)返回值異步方法出現(xiàn)異常所宰,主線程還是繼續(xù)執(zhí)行完成
*
* @param
* @return void
* @throws
* @author wliduo[i@dolyw.com]
* @date 2020/5/20 9:53
*/
@GetMapping("/run1")
public String run1() throws Exception {
asyncService.task1();
logger.info("run1開始執(zhí)行");
Thread.sleep(5000);
logger.info("run1執(zhí)行完成");
return "run1 success";
}
/**
* 可以看到有返回值異步方法出現(xiàn)異常,異常拋給主線程處理畜挥,導(dǎo)致主線程也被中斷執(zhí)行
*
* @param
* @return java.lang.String
* @throws
* @author wliduo[i@dolyw.com]
* @date 2020/5/20 10:15
*/
@GetMapping("/run2")
public String run2() throws Exception {
Future<String> future = asyncService.task2();
// get()方法阻塞主線程仔粥,直到執(zhí)行完成
String result = future.get();
logger.info("run2開始執(zhí)行");
Thread.sleep(5000);
logger.info("run2執(zhí)行完成");
return result;
}
/**
* 多個(gè)異步執(zhí)行
*
* @param
* @return java.lang.String
* @throws
* @author wliduo[i@dolyw.com]
* @date 2020/5/20 10:26
*/
@GetMapping("/run3")
public String run3() throws Exception {
logger.info("run3開始執(zhí)行");
long start = System.currentTimeMillis();
Future<String> future3 = asyncService.task3();
Future<String> future4 = asyncService.task4();
// 這樣與下面是一樣的
logger.info(future3.get());
logger.info(future4.get());
// 先判斷是否執(zhí)行完成
boolean run3Done = Boolean.FALSE;
while (true) {
if (future3.isDone() && future4.isDone()) {
// 執(zhí)行完成
run3Done = Boolean.TRUE;
break;
}
if (future3.isCancelled() || future4.isCancelled()) {
// 取消情況
break;
}
}
if (run3Done) {
logger.info(future3.get());
logger.info(future4.get());
} else {
// 其他異常情況
}
long end = System.currentTimeMillis();
logger.info("run3執(zhí)行完成,執(zhí)行時(shí)間: {}", end - start);
return "run3 success";
}
/**
* 工具類異步
*
* @param
* @return java.lang.String
* @throws
* @author wliduo[i@dolyw.com]
* @date 2020/5/20 10:59
*/
@GetMapping("/sms")
public String sms() throws Exception {
logger.info("run1開始執(zhí)行");
smsUtil.sendCode("15912347896", "135333");
logger.info("run1執(zhí)行完成");
return "send sms success";
}
}
2.2. ServiceImpl
/**
* AsyncServiceImpl
*
* @author wliduo[i@dolyw.com]
* @date 2020/5/19 14:24
*/
@Service
public class AsyncServiceImpl implements AsyncService {
/**
* logger
*/
private final static Logger logger = LoggerFactory.getLogger(AsyncServiceImpl.class);
@Override
@Async("threadPoolTaskExecutor")
public void task1() throws Exception {
logger.info("task1開始執(zhí)行");
Thread.sleep(3000);
logger.info("task1執(zhí)行結(jié)束");
throw new RuntimeException("出現(xiàn)異常");
}
@Override
@Async("threadPoolTaskExecutor")
public Future<String> task2() throws Exception {
logger.info("task2開始執(zhí)行");
Thread.sleep(3000);
logger.info("task2執(zhí)行結(jié)束");
throw new RuntimeException("出現(xiàn)異常");
// return new AsyncResult<String>("task2 success");
}
@Override
@Async("threadPoolTaskExecutor")
public Future<String> task3() throws Exception {
logger.info("task3開始執(zhí)行");
Thread.sleep(3000);
logger.info("task3執(zhí)行結(jié)束");
return new AsyncResult<String>("task3 success");
}
@Override
@Async("threadPoolTaskExecutor")
public Future<String> task4() throws Exception {
logger.info("task4開始執(zhí)行");
Thread.sleep(3000);
logger.info("task4執(zhí)行結(jié)束");
return new AsyncResult<String>("task4 success");
}
}
2.3. SmsUtil
/**
* SmsUtil
*
* @author wliduo[i@dolyw.com]
* @date 2020/5/20 10:50
*/
@Component
public class SmsUtil {
private static final Logger logger = LoggerFactory.getLogger(SmsUtil.class);
/**
* 異步發(fā)送短信
*
* @param phone
* @param code
* @return void
* @throws
* @author wliduo[i@dolyw.com]
* @date 2020/5/20 10:53
*/
@Async
public void sendCode(String phone, String code) {
logger.info("開始發(fā)送驗(yàn)證碼...");
// 模擬調(diào)用接口發(fā)驗(yàn)證碼的耗時(shí)
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.info("發(fā)送成功: {}", phone);
}
}
參考