springboot之多任務并行+線程池處理
最近項目中做到一個關于批量發(fā)短信的業(yè)務,如果用戶量特別大的話,不能使用單線程去發(fā)短信舶治,只能嘗試著使用多任務來完成
- Java 線程池
Java通過Executors提供四種線程池,分別為:
newCachedThreadPool創(chuàng)建一個可緩存線程池,如果線程池長度超過處理需要婆排,可靈活回收空閑線程声旺,若無可回收,則新建線程段只。
newFixedThreadPool 創(chuàng)建一個定長線程池腮猖,可控制線程最大并發(fā)數(shù),超出的線程會在隊列中等待赞枕。
newScheduledThreadPool 創(chuàng)建一個定長線程池澈缺,支持定時及周期性任務執(zhí)行。
newSingleThreadExecutor 創(chuàng)建一個單線程化的線程池炕婶,它只會用唯一的工作線程來執(zhí)行任務姐赡,保證所有任務按照指定順序(FIFO, LIFO, 優(yōu)先級)執(zhí)行。
優(yōu)點
重用存在的線程古话,減少對象創(chuàng)建雏吭、消亡的開銷,性能佳陪踩。
可有效控制最大并發(fā)線程數(shù)杖们,提高系統(tǒng)資源的使用率,同時避免過多資源競爭肩狂,避免堵塞摘完。
提供定時執(zhí)行、定期執(zhí)行傻谁、單線程孝治、并發(fā)數(shù)控制等功能。
- 方式一 CountDownLatch
public class StatsDemo {
final static SimpleDateFormat sdf = new SimpleDateFormat(
"yyyy-MM-dd HH:mm:ss");
final static String startTime = sdf.format(new Date());
/**
* IO密集型任務 = 一般為2*CPU核心數(shù)(常出現(xiàn)于線程中:數(shù)據(jù)庫數(shù)據(jù)交互审磁、文件上傳下載谈飒、網(wǎng)絡數(shù)據(jù)傳輸?shù)鹊龋? * CPU密集型任務 = 一般為CPU核心數(shù)+1(常出現(xiàn)于線程中:復雜算法)
* 混合型任務 = 視機器配置和復雜度自測而定
*/
private static int corePoolSize = Runtime.getRuntime().availableProcessors();
/**
* public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,
* TimeUnit unit,BlockingQueue<Runnable> workQueue)
* corePoolSize用于指定核心線程數(shù)量
* maximumPoolSize指定最大線程數(shù)
* keepAliveTime和TimeUnit指定線程空閑后的最大存活時間
* workQueue則是線程池的緩沖隊列,還未執(zhí)行的線程會在隊列中等待
* 監(jiān)控隊列長度,確保隊列有界
* 不當?shù)木€程池大小會使得處理速度變慢态蒂,穩(wěn)定性下降杭措,并且導致內(nèi)存泄露。如果配置的線程過少钾恢,則隊列會持續(xù)變大手素,消耗過多內(nèi)存。
* 而過多的線程又會 由于頻繁的上下文切換導致整個系統(tǒng)的速度變緩——殊途而同歸瘩蚪。隊列的長度至關重要泉懦,它必須得是有界的,這樣如果線程池不堪重負了它可以暫時拒絕掉新的請求疹瘦。
* ExecutorService 默認的實現(xiàn)是一個無界的 LinkedBlockingQueue崩哩。
*/
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, corePoolSize+1, 10l, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(1000));
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(5);
//使用execute方法
executor.execute(new Stats("任務A", 1000, latch));
executor.execute(new Stats("任務B", 1000, latch));
executor.execute(new Stats("任務C", 1000, latch));
executor.execute(new Stats("任務D", 1000, latch));
executor.execute(new Stats("任務E", 1000, latch));
latch.await();// 等待所有人任務結(jié)束
System.out.println("所有的統(tǒng)計任務執(zhí)行完成:" + sdf.format(new Date()));
}
static class Stats implements Runnable {
String statsName;
int runTime;
CountDownLatch latch;
public Stats(String statsName, int runTime, CountDownLatch latch) {
this.statsName = statsName;
this.runTime = runTime;
this.latch = latch;
}
public void run() {
try {
System.out.println(statsName+ " do stats begin at "+ startTime);
//模擬任務執(zhí)行時間
Thread.sleep(runTime);
System.out.println(statsName + " do stats complete at "+ sdf.format(new Date()));
latch.countDown();//單次任務結(jié)束,計數(shù)器減一
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
- 方式二 Future
重點是和springboot整合拱礁,采用注解bean方式生成ThreadPoolTaskExecutor
@Bean
//spring依賴包
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
public class GlobalConfig {
/**
* 默認線程池線程池
*
* @return Executor
*/
@Bean
public ThreadPoolTaskExecutor defaultThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//核心線程數(shù)目
executor.setCorePoolSize(16);
//指定最大線程數(shù)
executor.setMaxPoolSize(64);
//隊列中最大的數(shù)目
executor.setQueueCapacity(16);
//線程名稱前綴
executor.setThreadNamePrefix("defaultThreadPool_");
//rejection-policy:當pool已經(jīng)達到max size的時候琢锋,如何處理新任務
//CALLER_RUNS:不在新線程中執(zhí)行任務辕漂,而是由調(diào)用者所在的線程來執(zhí)行
//對拒絕task的處理策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//線程空閑后的最大存活時間
executor.setKeepAliveSeconds(60);
//加載
executor.initialize();
return executor;
}
}
使用
//通過注解引入配置
@Resource(name = "defaultThreadPool")
private ThreadPoolTaskExecutor executor;
//使用Future方式執(zhí)行多任務
//生成一個集合
List<Future> futures = new ArrayList<>();
//獲取后臺全部有效運營人員的集合
List<AdminUserMsgResponse> adminUserDOList = adminManagerService.GetUserToSentMsg(null);
for (AdminUserMsgResponse response : adminUserDOList) {
//并發(fā)處理
if (response.getMobile() != null) {
Future<?> future = executor.submit(() -> {
//發(fā)送短信
mobileMessageFacade.sendCustomerMessage(response.getMobile(), msgConfigById.getContent());
});
futures.add(future);
}
}
//查詢?nèi)蝿請?zhí)行的結(jié)果
for (Future<?> future : futureList) {
while (true) {//CPU高速輪詢:每個future都并發(fā)輪循呢灶,判斷完成狀態(tài)然后獲取結(jié)果吴超,這一行,是本實現(xiàn)方案的精髓所在鸯乃。即有10個future在高速輪詢鲸阻,完成一個future的獲取結(jié)果,就關閉一個輪詢
if (future.isDone()&& !future.isCancelled()) {//獲取future成功完成狀態(tài)缨睡,如果想要限制每個任務的超時時間鸟悴,取消本行的狀態(tài)判斷+future.get(1000*1, TimeUnit.MILLISECONDS)+catch超時異常使用即可。
Integer i = future.get();//獲取結(jié)果
System.out.println("任務i="+i+"獲取完成!"+new Date());
list.add(i);
break;//當前future獲取結(jié)果完畢奖年,跳出while
} else {
Thread.sleep(1);//每次輪詢休息1毫秒(CPU納秒級)细诸,避免CPU高速輪循耗空CPU---》新手別忘記這個
}
}
}