場(chǎng)景##
公司一站通翻譯同步系統(tǒng) , 需要將客戶保存在美國(guó)站點(diǎn)的文章翻譯并保存到其余22個(gè)站點(diǎn)的數(shù)據(jù)庫。由于翻譯需要耗費(fèi)較長(zhǎng)的時(shí)間,故而使用隊(duì)列將任務(wù)投遞到線程池中處理呼奢,
- Springboot 配置產(chǎn)品同步核心線程池
package cn.configuration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
@Configuration
@EnableAsync // 開啟線程池
public class AsyncThreadPoolConfiguration
{
/**
* 配置默認(rèn)線程池化戳,用于處理一些公共異步任務(wù)
*/
@Bean("defaultThread")
public Executor defaultThread(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(20);// 核心線程數(shù),
executor.setMaxPoolSize(40);// 并發(fā)線程的數(shù)量限制為2
executor.setQueueCapacity(200); // 線程隊(duì)列
executor.setThreadNamePrefix("defaultThread@");
executor.initialize();
return executor;
}
/**
* 同步產(chǎn)品使用的線程池
* @return
*/
@Bean("syncProduct")
public Executor syncProduct() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(22);// 核心線程數(shù),
executor.setMaxPoolSize(22);// 并發(fā)線程的數(shù)量限制為2
executor.setQueueCapacity(100); // 線程隊(duì)列
executor.setThreadNamePrefix("syncProduct@");
executor.initialize();
return executor;
}
/**
* todo 配置其他功能的線程池
*/
}
2.1 使用方式一 注入線程池對(duì)象 ,通過 lomda表達(dá)式結(jié)合Future實(shí)現(xiàn)
package cn.services;
import cn.utils.SystemUtils;
import cn.utils.TimeUtils;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.*;
@Service
public class IndexServices
{
// 注入線程池對(duì)象
@Resource(name = "syncProduct")
private ThreadPoolTaskExecutor syncProduct;
public void ansyc() throws InterruptedException {
// 初始化任務(wù)結(jié)果集
Map<String,Future<String>> futures = new HashMap<>();
// 模擬需要同步的 22個(gè)子站點(diǎn)
List<String> langs = new ArrayList<>();
for (int i=1;i<=22;i++){
langs.add("lang"+i);
}
// 多線程處理任務(wù)
for (String lang : langs) {
Future<String> future = syncProduct.submit(() -> {
// 真正的業(yè)務(wù)處理
if(lang.equals("lang10")){
Thread.sleep(8000);// 模擬阻塞時(shí)間
}else{
Thread.sleep(3000); // 模擬阻塞時(shí)間
}
System.out.println("打印結(jié)果 : " + lang + "當(dāng)前線程名稱是:"+SystemUtils.getThreadID());
return lang + "...";
});
futures.put(lang,future);
}
// 阻塞等待結(jié)果单料,缺點(diǎn)該種方式無法設(shè)置阻塞等待有效時(shí)間,如果有一個(gè)線程阻塞,會(huì)導(dǎo)致整個(gè)線程池一直等待
for (Map.Entry<String, Future<String>> entry : futures.entrySet()) {
while (true){
Future<String> future = entry.getValue();
String lang = entry.getKey();
if(future.isDone() && !future.isCancelled()){
String result = null;
try{
result = future.get();
//result = future.get(6L,TimeUnit.SECONDS); 設(shè)置阻塞等待時(shí)間無效
}catch(ExecutionException e){
e.printStackTrace();
}
System.out.println(lang+" 站點(diǎn)任務(wù)結(jié)果result=" + result + "獲取完成!" + TimeUtils.getCurretDate());
break;
}else{
Thread.sleep(1);//每次輪詢休息1毫秒(CPU納秒級(jí))点楼,避免CPU高速輪循耗空CPU 扫尖,這個(gè)至關(guān)重要
}
}
}
/*
// 設(shè)置阻塞時(shí)間有效版本
for (Map.Entry<String, Future<String>> entry : futures.entrySet()) {
while (true){
Future<String> future = entry.getValue();
String lang = entry.getKey();
String result = null;
try{
result = future.get(3L,TimeUnit.SECONDS);
}catch(ExecutionException e){
e.printStackTrace();
}catch (TimeoutException e){
System.out.println("存在超時(shí)的任務(wù),無法獲取結(jié)果");
}
System.out.println(lang+" 站點(diǎn)任務(wù)結(jié)果result=" + result + "獲取完成!" + TimeUtils.getCurretDate());
break;
}
}
*/
}
}
2.2 上線方式二,通過Springboot的Ansyc注解實(shí)現(xiàn)任務(wù)并發(fā)
實(shí)現(xiàn)思路是 寫一個(gè) @Ansyc 注解標(biāo)記的業(yè)務(wù)方法掠廓,在外層循環(huán)調(diào)用换怖,通過參數(shù)控制實(shí)現(xiàn)不同的邏輯
/**
* 翻譯英文站點(diǎn)文章 并同步到22個(gè)子站點(diǎn)
*/
// 業(yè)務(wù)核心
@Async("syncProduceToSite")
public CompletableFuture<String> syncProduceToSite(String lang) throws InterruptedException {
// 通過 參數(shù) lang的不同,實(shí)現(xiàn)翻譯成不同的語言蟀瞧,并推送到不同站點(diǎn)的數(shù)據(jù)庫
Thread.sleep(8000L);
String results = lang + " success";
String name = SystemUtils.getCurrentThreadName();
return CompletableFuture.completedFuture("站點(diǎn)"+lang + "處理的結(jié)果是:"+results+";線程名稱是:"+name);
}
在外層調(diào)用過程
// 模擬準(zhǔn)備 22個(gè)站點(diǎn)的語言標(biāo)識(shí)符參數(shù)
List<String> langs = new ArrayList<>();
for (int i=1;i<=22;++i){
langs.add("lang"+i);
}
// 初始化結(jié)果集
Map<String,CompletableFuture<String>> futureMap = new HashMap<>();
for (String lang: langs) {
// 業(yè)務(wù)執(zhí)行
CompletableFuture<String> future = authServices.syncProduceToSite(lang);
futureMap.put(lang,future); // 將結(jié)果集放入 map
}
// 獲取結(jié)果集
for (Map.Entry<String, CompletableFuture<String>> entry : futureMap.entrySet()){
String lang = entry.getKey();
CompletableFuture<String> future = entry.getValue();
try {
// CompletableFuture.anyOf(future).join(); // 阻塞等待結(jié)果的返回,加上這一句沉颂,那么下面的阻塞時(shí)間就無效
String result= future.get(2L, TimeUnit.SECONDS);
System.out.println("站點(diǎn):"+lang+" 得到結(jié)果啦 "+result);
} catch (TimeoutException e) {
// e.printStackTrace();
System.out.println("站點(diǎn):"+lang+" 超時(shí),無法獲取結(jié)果");
}
}