SpringBoot框架@Async注解文章:SpringBoot異步調(diào)用@Async
SpringBoot線程池ThreadPoolExecutor文章:SpringBoot線程池ThreadPoolExecutor
ThreadPoolTaskExecutor是一個(gè)spring的線程池技術(shù),其實(shí)匆绣,它的實(shí)現(xiàn)方式完全是使用ThreadPoolExecutor進(jìn)行實(shí)現(xiàn)牙肝。
SpringBoot線程池ThreadPoolTaskExecutor代碼實(shí)現(xiàn)
service層
- 創(chuàng)建一個(gè)service層的接口AsyncService级及,如下:
public interface AsyncService {
/**
* 執(zhí)行異步任務(wù)
* */
void executeAsync();
}
- 對(duì)應(yīng)的AsyncServiceImpl英上,實(shí)現(xiàn)如下:
import com.ceair.service.AsyncService;
import lombok.extern.java.Log;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
/**
* 異步線程service
* @author jeffrey_hjf
*/
@Service
@Log
public class AsyncServiceImpl implements AsyncService {
@Override
@Async("asyncServiceExecutor")
public void executeAsync() {
log.info("start executeAsync");
try{
Thread.sleep(1000);
}catch(Exception e){
e.printStackTrace();
}
log.info("end executeAsync");
}
}
線程池配置
創(chuàng)建一個(gè)配置類ThreadPoolExecutorConfig,用來定義如何創(chuàng)建一個(gè)ThreadPoolTaskExecutor疚漆,要使用@Configuration和@EnableAsync這兩個(gè)注解夭苗,表示這是個(gè)配置類,并且是線程池的配置類缚够,如下所示:
@Configuration
//@EnableAsync //在啟動(dòng)類里面加了@EnableAsync標(biāo)識(shí)幔妨,這里可以不加
@Log
public class ThreadPoolExecutorConfig {
@Value("${async.executor.thread.core_pool_size}")
private int corePoolSize = 5;
@Value("${async.executor.thread.max_pool_size}")
private int maxPoolSize = 5;
@Value("${async.executor.thread.queue_capacity}")
private int queueCapacity = 99999;
@Value("${async.executor.thread.name.prefix}")
private String namePrefix = "async-service-";
/**
* 異步線程池
*/
@Bean("asyncServiceExecutor")
public Executor asyncServiceExecutor() {
log.info("start asyncServiceExecutor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//配置核心線程數(shù)
executor.setCorePoolSize(corePoolSize);
//配置最大線程數(shù)
executor.setMaxPoolSize(maxPoolSize);
//配置隊(duì)列大小
executor.setQueueCapacity(queueCapacity);
//配置線程池中的線程的名稱前綴
executor.setThreadNamePrefix(namePrefix);
// rejection-policy:當(dāng)pool已經(jīng)達(dá)到max size的時(shí)候,如何處理新任務(wù)
// CALLER_RUNS:不在新線程中執(zhí)行任務(wù)谍椅,而是有調(diào)用者所在的線程來執(zhí)行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//執(zhí)行初始化
executor.initialize();
return executor;
}
}
controller層
創(chuàng)建一個(gè)controller為Hello误堡,里面定義一個(gè)http接口,做的事情是調(diào)用Service層的服務(wù)雏吭,如下:
/**
* @ClassName UserController
* @Author jeffrey_hjf
* @Description User
**/
@RestController
@RequestMapping("/api")
@Log
public class UserController {
@Autowired
private AsyncService asyncService;
/**
* 線程池測(cè)試入口
*/
@GetMapping("executeAsyncPool")
public String executeAsyncPool() {
log.info("start submit");
//調(diào)用service層的任務(wù)
asyncService.executeAsync();
log.info("end submit");
return "success";
}
}
執(zhí)行效果
控制臺(tái)看見日志如下:
2019-08-26 17:28:12.552 INFO 9024 --- [nio-9090-exec-1] com.ceair.controller.UserController : start submit
2019-08-26 17:28:12.558 INFO 9024 --- [nio-9090-exec-1] com.ceair.controller.UserController : end submit
2019-08-26 17:28:12.559 INFO 9024 --- [async-service-1] com.ceair.service.impl.AsyncServiceImpl : start executeAsync
2019-08-26 17:28:13.560 INFO 9024 --- [async-service-1] com.ceair.service.impl.AsyncServiceImpl : end executeAsync
如上日志所示锁施,我們可以看到controller的執(zhí)行線程是”nio-8080-exec-1”,這是tomcat的執(zhí)行線程,而service層的日志顯示線程名為“async-service-1”悉抵,顯然已經(jīng)在我們配置的線程池中執(zhí)行了肩狂,并且每次請(qǐng)求中,controller的起始和結(jié)束日志都是連續(xù)打印的姥饰,表明每次請(qǐng)求都快速響應(yīng)了傻谁,而耗時(shí)的操作都留給線程池中的線程去異步執(zhí)行;
SpringBoot線程池?cái)U(kuò)展ThreadPoolTaskExecutor代碼實(shí)現(xiàn)
雖然我們已經(jīng)用上了線程池列粪,但是還不清楚線程池當(dāng)時(shí)的情況审磁,有多少線程在執(zhí)行,多少在隊(duì)列中等待呢篱竭?這里我創(chuàng)建了一個(gè)ThreadPoolTaskExecutor的子類力图,在每次提交線程的時(shí)候都會(huì)將當(dāng)前線程池的運(yùn)行狀況打印出來。
擴(kuò)展ThreadPoolTaskExecutor類
import lombok.extern.java.Log;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
@Log
public class VisiableThreadPoolTaskExecutorConfig extends ThreadPoolTaskExecutor {
private void showThreadPoolInfo(String prefix){
ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
if(null==threadPoolExecutor){
return;
}
log.info(String.format("%s, %s,taskCount [%s], completedTaskCount [%s], activeCount [%s], queueSize [%s]",
this.getThreadNamePrefix(),
prefix,
threadPoolExecutor.getTaskCount(),
threadPoolExecutor.getCompletedTaskCount(),
threadPoolExecutor.getActiveCount(),
threadPoolExecutor.getQueue().size()));
}
@Override
public void execute(Runnable task) {
showThreadPoolInfo("1. do execute");
super.execute(task);
}
@Override
public void execute(Runnable task, long startTimeout) {
showThreadPoolInfo("2. do execute");
super.execute(task, startTimeout);
}
@Override
public Future<?> submit(Runnable task) {
showThreadPoolInfo("1. do submit");
return super.submit(task);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
showThreadPoolInfo("2. do submit");
return super.submit(task);
}
@Override
public ListenableFuture<?> submitListenable(Runnable task) {
showThreadPoolInfo("1. do submitListenable");
return super.submitListenable(task);
}
@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
showThreadPoolInfo("2. do submitListenable");
return super.submitListenable(task);
}
}
如上所示掺逼,showThreadPoolInfo方法中將任務(wù)總數(shù)吃媒、已完成數(shù)、活躍線程數(shù)吕喘,隊(duì)列大小都打印出來了赘那,然后Override了父類的execute、submit等方法氯质,在里面調(diào)用showThreadPoolInfo方法募舟,這樣每次有任務(wù)被提交到線程池的時(shí)候,都會(huì)將當(dāng)前線程池的基本情況打印到日志中闻察;
修改ThreadPoolExecutorConfig配置類
修改ThreadPoolExecutorConfig.java的asyncServiceExecutor方法拱礁,將ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor()改為ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor(),如下所示:
/**
* 線程池配置類
* @author jeffrey_hjf
*/
@Configuration
//@EnableAsync
@Log
public class ThreadPoolExecutorConfig {
@Value("${async.executor.thread.core_pool_size}")
private int corePoolSize = 5;
@Value("${async.executor.thread.max_pool_size}")
private int maxPoolSize = 5;
@Value("${async.executor.thread.queue_capacity}")
private int queueCapacity = 99999;
@Value("${async.executor.thread.name.prefix}")
private String namePrefix = "async-service-";
/**
* 異步線程池
*/
@Bean("asyncServiceExecutor")
public Executor asyncServiceExecutor() {
log.info("start asyncServiceExecutor");
ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutorConfig();
//配置核心線程數(shù)
executor.setCorePoolSize(corePoolSize);
//配置最大線程數(shù)
executor.setMaxPoolSize(maxPoolSize);
//配置隊(duì)列大小
executor.setQueueCapacity(queueCapacity);
//配置線程池中的線程的名稱前綴
executor.setThreadNamePrefix(namePrefix);
// rejection-policy:當(dāng)pool已經(jīng)達(dá)到max size的時(shí)候辕漂,如何處理新任務(wù)
// CALLER_RUNS:不在新線程中執(zhí)行任務(wù)呢灶,而是有調(diào)用者所在的線程來執(zhí)行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//執(zhí)行初始化
executor.initialize();
return executor;
}
}
執(zhí)行效果
日志如下:
2019-08-26 17:55:22.091 INFO 15252 --- [nio-9090-exec-8] com.ceair.controller.UserController : start submit
2019-08-26 17:55:22.091 INFO 15252 --- [nio-9090-exec-8] c.c.VisiableThreadPoolTaskExecutorConfig : async-service-, 2. do submit,taskCount [5], completedTaskCount [2], activeCount [2], queueSize [1]
2019-08-26 17:55:22.092 INFO 15252 --- [nio-9090-exec-8] com.ceair.controller.UserController : end submit
2019-08-26 17:55:22.092 INFO 15252 --- [async-service-1] com.ceair.service.impl.AsyncServiceImpl : start executeAsync
2019-08-26 17:55:23.094 INFO 15252 --- [async-service-1] com.ceair.service.impl.AsyncServiceImpl : end executeAsync
這說明提交任務(wù)到線程池的時(shí)候,調(diào)用的是submit(Callable task)這個(gè)方法钉嘹,當(dāng)前已經(jīng)提交了5個(gè)任務(wù)鸯乃,完成了2個(gè),當(dāng)前有2個(gè)線程在處理任務(wù)跋涣,還剩1個(gè)任務(wù)在隊(duì)列中等待缨睡,線程池的基本情況一路了然;
SpringBoot框架@Async注解文章:SpringBoot異步調(diào)用@Async
SpringBoot線程池ThreadPoolExecutor文章:SpringBoot線程池ThreadPoolExecutor