1衷模、啟動(dòng)類注入線程
@Bean(value = "taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(20);
scheduler.setThreadNamePrefix("task-");
//用來設(shè)置線程池關(guān)閉的時(shí)候等待所有任務(wù)都完成再繼續(xù)銷毀其他的Bean薪寓,
scheduler.setWaitForTasksToCompleteOnShutdown(true);
//線程池對(duì)拒絕任務(wù)的處理策略:這里采用了CallerRunsPolicy策略粤咪,當(dāng)線程池沒有處理能力的時(shí)候幽纷,該策略會(huì)直接在 execute 方法的調(diào)用線程中運(yùn)行被拒絕的任務(wù);如果執(zhí)行程序已關(guān)閉书在,則會(huì)丟棄該任務(wù)
scheduler.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return scheduler;
}
2、定時(shí)任務(wù)
private Map<Long, ReporterCheck> lastMap = new HashMap<>();
private Map<Long, ScheduledFuture> futureMap = new ConcurrentHashMap<>();
@Scheduled(cron = "0 0/5 * * * ?")
public void dynamic() {
ThreadPoolTaskScheduler threadPoolTaskScheduler = (ThreadPoolTaskScheduler) taskScheduler;
//切換數(shù)據(jù)源
DataBaseTypeEnum.changeDataBaseTypeEnum(DataBaseTypeEnum.REPORTER.getDesc());
List<ReporterCheck> reporterChecks = reportCheckMapper.selectAll();
Map<Long, ReporterCheck> currReport = new HashMap<>();
//循環(huán)當(dāng)前報(bào)表任務(wù)
reporterChecks.forEach(reporterCheck -> {
Long rid = reporterCheck.getId();
currReport.put(rid, reporterCheck);
if (!lastMap.containsKey(rid)) {
//新增拆又,任務(wù)
ScheduledFuture<?> scheduledFuture = threadPoolTaskScheduler.schedule(new Task(rid),
new CronTrigger(reporterCheck.getCorn()));
futureMap.put(rid, scheduledFuture);
}
});
//循環(huán)上次的報(bào)表任務(wù)
for (Long key : lastMap.keySet()) {
if (currReport.containsKey(key)
&& !currReport.get(key).getCorn().equals(lastMap.get(key).getCorn())) {
//cron表達(dá)式有變動(dòng)儒旬,則更新
futureMap.get(key).cancel(false);
ScheduledFuture<?> scheduledFuture = threadPoolTaskScheduler.schedule(new Task(key),
new CronTrigger(currReport.get(key).getCorn()));
futureMap.put(key, scheduledFuture);
} else if (!currReport.containsKey(key)) {
//刪除任務(wù)
futureMap.get(key).cancel(true);
futureMap.remove(key);
}
}
//重新賦值上次任務(wù)列表
lastMap = currReport;
}
class Task implements Runnable {
private Long id;
public Task(Long id) {
this.id = id;
}
@Override
public void run() {
try {
logger.info("執(zhí)行任務(wù){(diào)}開始", id);
reportCheckService.handleReport(id);
logger.info("執(zhí)行任務(wù){(diào)}結(jié)束", id);
} catch (Exception e) {
logger.error("執(zhí)行報(bào)表任務(wù)ID=" + id + "異常", e.getMessage());
}
}
}
3、邏輯執(zhí)行
public void handleReport(Long id) {
//切換數(shù)據(jù)源
DataBaseTypeEnum.changeDataBaseTypeEnum(DataBaseTypeEnum.REPORTER.getDesc());
ReporterCheck reporterCheck = reporterCheckMapper.selectByPrimaryKey(id);
//組件:ModuleType=1 sql:ModuleType=2
if (reporterCheck.getModuleType().equals(1)) {
//API
Map<String, Object> map = (Map<String, Object>) produceIndicatorData(reporterCheck);
if (!map.isEmpty()) {
//SendMail
sendService.sendMail(reporterCheck.getTopic(), reporterCheck.getContent() + ":\r\n" + formatPut(map), null, reporterCheck.getToUcid());
}
} else {
//SQL params
List<Map<String, Object>> res = getResultFromSQL(reporterCheck);
List<Map<String, Object>> contentList = new ArrayList<>();
Map<String, String> expectedMap = new HashMap<>();
String[] expectArray = reporterCheck.getExpectRes().split(";");
for (String item : expectArray) {
String[] keyValue = item.split(":");
if (keyValue.length == 2) {
expectedMap.put(keyValue[0], keyValue[1]);
}
}
//沒有指定期望
if (expectedMap.isEmpty()) {
contentList.addAll(res);
} else {
boolean flag = false;
for (Map<String, Object> item : res) {
for (String key : expectedMap.keySet()) {
if (item.get(key) == null || !expectedMap.get(key).equals(item.get(key).toString())) {
flag = true;
break;
}
}
if (flag) {
contentList.add(item);
}
flag = false;
}
}
//求優(yōu)雅輸出
System.out.println();
if (!contentList.isEmpty()) {
//SendMail
sendService.sendMail(reporterCheck.getTopic(), reporterCheck.getContent() + ":\r\n" + formatPut(contentList), null, reporterCheck.getToUcid());
}
}
}
private List<Map<String, Object>> getResultFromSQL(ReporterCheck reporterCheck) {
log.info("topic:{}", reporterCheck.getTopic());
//切換數(shù)據(jù)源
DataBaseTypeEnum.changeDataBaseTypeEnum(reporterCheck.getDataSource());
List<Map<String, Object>> res = reporterCheckMapper.querySql(reporterCheck.getModule());
return res;
}
private String formatPut(Object contentList) {
String content = JSONObject.toJSONString(contentList);
ObjectMapper mapper = new ObjectMapper();
Object obj = null;
try {
obj = mapper.readValue(content, Object.class);
return mapper.writerWithDefaultPrettyPrinter().writeValueAsString(obj);
} catch (IOException e) {
log.info("內(nèi)容輸出格式化失斕濉:{}", e.getMessage());
}
return "";
}
public Object produceIndicatorData(ReporterCheck reporterCheck) {
String[] api = reporterCheck.getModule().split("#");
try {
String className = SpringUtil.decapitalize(api[0].substring(api[0].lastIndexOf(".") + 1, api[0].length()));
Object obj = SpringUtil.getBean(className);
Class clazz = obj.getClass();
Method method = clazz.getDeclaredMethod(api[1], ReporterCheck.class, Map.class);
Map<String, Object> map = new HashMap<>();
String[] params = reporterCheck.getParams().split(";");
for (String item : params) {
String[] keyValue = item.split(":");
if (keyValue.length == 2) {
map.put(keyValue[0], keyValue[1]);
}
}
return method.invoke(obj, reporterCheck, map);
} catch (NoSuchMethodException e) {
log.info("獲取指定方式失斦辉础:{}", e.getMessage());
} catch (IllegalAccessException e) {
log.info("反射調(diào)用方法參數(shù)錯(cuò)誤:{}", e.getMessage());
} catch (InvocationTargetException e) {
log.info("反射調(diào)用方法InvocationTargetException錯(cuò)誤:{}", e);
}
return null;
}