定時(shí)任務(wù)掃描訂單數(shù)據(jù)
任務(wù)描述:
設(shè)計(jì)為主任務(wù)和子任務(wù),主任務(wù)只處理指定時(shí)間片怀大,子任務(wù)會(huì)拆分主任務(wù)的時(shí)間片。其目的為拆分后的子任務(wù)可以并行執(zhí)行呀闻,不會(huì)干擾化借,提供吞吐量。
主任務(wù)狀態(tài):新建捡多、執(zhí)行中蓖康、執(zhí)行完畢
多次觸發(fā)時(shí)铐炫,沒有主任務(wù)則新建,執(zhí)行中不處理蒜焊,當(dāng)前任務(wù)執(zhí)行完畢倒信,則需新建下一個(gè)時(shí)間段的主任務(wù)。
主任務(wù)初始化:
觸發(fā) job泳梆,主任務(wù)領(lǐng)域通過【加載最近執(zhí)行的任務(wù)】或【新建主任務(wù)】方式鳖悠,初始化主任務(wù)信息。
//獲取最近同步的主任務(wù)
OrderSyncMainTask lastSyncMainTask = MainTaskFactory.getOrInitLastMainTask();
public static OrderSyncMainTask getOrInitLastMainTask() {
// 任務(wù)查詢并啟動(dòng)鸭丛,暫時(shí)未考慮連續(xù)觸發(fā)的情況(并發(fā))
OrderSyncMainTask orderSyncMainTask = omsOrderSyncTaskRepo.getLastMainTask();
if (orderSyncMainTask == null) {
orderSyncMainTask = initMainTask(null);
}
return orderSyncMainTask;
}
問題描述:
這里【主任務(wù)領(lǐng)域】初始化的方式竞穷,采用工廠+靜態(tài)方法,對(duì)于外部傳入的入?yún)⒘鄹龋呛苋菀准庸ど赡繕?biāo)類的瘾带,但如果需要再次讀取 db 庫的數(shù)據(jù),工廠類就需要引入(注入)相關(guān) service熟菲、repository看政。
@Slf4j
@Component
public class MainTaskFactory {
static OmsOrderSyncTaskRepo omsOrderSyncTaskRepo;
//從spring容器找到要用的bean
@Autowired
public void setOmsOrderSyncTaskRepo(OmsOrderSyncTaskRepo omsOrderSyncTaskRepo) {
MainTaskFactory.omsOrderSyncTaskRepo = omsOrderSyncTaskRepo;
}
//...
}
現(xiàn)在出現(xiàn)一個(gè)我們不熟悉的依賴方式,靜態(tài)工廠的靜態(tài)屬性抄罕,需要引入 spring 的 bean 實(shí)例允蚣。
實(shí)際上,讓 MainTaskFactory 做了初始化呆贿,從持久化數(shù)據(jù)庫讀數(shù)據(jù)嚷兔,就必然依賴 spring 中的 bean:
整理下 MainTaskFactory 的工作職責(zé):
- 職責(zé) 1: 為了通過靜態(tài)方法使用其它 bean,將自己作為 bean 組件做入,并通過 static set 方式引入要用的 bean冒晰。
- 職責(zé) 2:獲取、初始化 MainTask竟块。
- create MainTask壶运,實(shí)際上 new MainTask
- 職責(zé) 3:持久化主任務(wù),為此需要職責(zé) 1浪秘。
總結(jié)為:factory 做了查詢蒋情、構(gòu)造、保存工作耸携,這里分工混亂棵癣。
誰來讀取持久化的數(shù)據(jù)(分工);
如何讀取持久化的數(shù)據(jù)(實(shí)現(xiàn)方式)夺衍;
誰來保存 domain 信息浙巫;
repo.save(domain)
domain 轉(zhuǎn)換出 dto1,dto2,dto3
=======以下為同一事務(wù)=======
dto1Repo.save(dto1);
dto2Repo.save(dto2);
dto3Repo.save(dto3);
寫到這里,發(fā)現(xiàn)寫和讀應(yīng)該屬于同一邏輯:
//repo 加載 domain 的方法:
public domain load(cmd.getId);
按以上想法,重新設(shè)計(jì):
repo 層的畴,從持久化服務(wù)中渊抄,加載需要的 dto,交給 factory 構(gòu)造 domain丧裁。
public SyncOmsOrderTaskDomain loadLast() {
MainTaskDTO mainTaskDTO = null;//omsOrderSyncTaskRepo.getLastMainTask();
List<SubTaskDTO> subTaskDTOList = null;//omsOrderSyncTaskRepo.getSubTaskList(mainTaskDTO.getId());
List<SubTaskRecordDTO> subTaskRecordDTOList = new ArrayList<>();//獲取子任務(wù)執(zhí)行記錄护桦,存在多個(gè)記錄,取最后一次煎娇,或者設(shè)計(jì)上二庵,保證每個(gè)子任務(wù)只有一個(gè)有效的記錄
return MainTaskFactory.create(mainTaskDTO, subTaskDTOList, subTaskRecordDTOList);
}
factory 構(gòu)造 domain 對(duì)象。
public static SyncOmsOrderTaskDomain create(MainTaskDTO mainTaskDTO, List<SubTaskDTO> subTaskDTOList, List<SubTaskRecordDTO> subTaskRecordDTOList) {
String taskId = UUIDHexGenerator.generate();
//對(duì)查詢的值做一些基本校驗(yàn)
return new SyncOmsOrderTaskDomain(mainTaskDTO, subTaskDTOList, subTaskRecordDTOList);
}
domainService 中缓呛,操作 domain 的各種方法催享,包括必要時(shí),會(huì)通過入?yún)⒂窗怼⑴渲玫刃畔⒁蛎睿ㄟ^ factory 構(gòu)造 domain。
@Service
public class MainTaskDomainService {
@Resource
MainTaskRepo mainTaskRepo;
@Resource
MainTaskConfig mainTaskConfig;
@Resource
SubTaskConfig subTaskConfig;
@Resource
ConcurrentReadOrderService concurrentReadOrderService;
//觸發(fā)任務(wù)調(diào)度
public String triggerTask() {
SyncOmsOrderTaskDomain syncOmsOrderTaskDomain = mainTaskRepo.loadLast();
if (syncOmsOrderTaskDomain.ifNotExsit()) {
syncOmsOrderTaskDomain.initTaskByConfig(mainTaskConfig);
syncOmsOrderTaskDomain.initSubTaskByConfig(subTaskConfig);
mainTaskRepo.save(syncOmsOrderTaskDomain);
//細(xì)節(jié)1:實(shí)時(shí)更新子任務(wù)發(fā)送進(jìn)度票髓,如何實(shí)現(xiàn)攀涵?
// 1。子任務(wù)查詢并發(fā)送成功后洽沟,可使用內(nèi)部通知機(jī)制以故,異步通知、更新裆操。
// 2怒详。子任務(wù)查詢發(fā)送、直接調(diào)用repo更新(有點(diǎn)耦合)
concurrentReadOrderService.executeRead(syncOmsOrderTaskDomain);
return "任務(wù)首次運(yùn)行";
} else {
if (syncOmsOrderTaskDomain.isRunning()) {
return "執(zhí)行中";
}
if (syncOmsOrderTaskDomain.needReRun()){
syncOmsOrderTaskDomain.updateReRunTask();
mainTaskRepo.updateReRunTask(syncOmsOrderTaskDomain);
concurrentReadOrderService.executeRead(syncOmsOrderTaskDomain);
return "重新運(yùn)行";
}
if (syncOmsOrderTaskDomain.isFinished()) {
syncOmsOrderTaskDomain.createNextTask();
mainTaskRepo.save(syncOmsOrderTaskDomain);
concurrentReadOrderService.executeRead(syncOmsOrderTaskDomain);
return "運(yùn)行下一周期";
}
}
return "未知狀態(tài)";
}
}
更新子任務(wù)發(fā)送狀態(tài):
執(zhí)行子任務(wù)時(shí)踪区,會(huì)組裝成 runable棘利,交給線程池執(zhí)行查詢,每次查詢到數(shù)據(jù)朽缴,同時(shí)發(fā)送內(nèi)部事件,更新子任務(wù)的發(fā)送數(shù)據(jù)數(shù)以及更新是否發(fā)送完畢狀態(tài)水援。
發(fā)送消息結(jié)構(gòu):
class SubTaskSendCommand {
String taskId;
String subTaskId;
int sendCount;
boolean sendFinished;
}
因?yàn)榘l(fā)送的狀態(tài)數(shù)據(jù)僅需保存密强,所以通過 factory 直接構(gòu)造 domain,通過 repo save蜗元。
SyncOrderTaskDomain domain = MainTaskFactory.generate(subTaskSendCommand);
repo.updateSendInfo(domain);
更新子任務(wù)消費(fèi)狀態(tài):
掃描的數(shù)據(jù)被消費(fèi)處理成功后或渤,需要更新任務(wù)記錄狀態(tài),處理流程和上步類似:
class SubTaskConsumerCommand {
String taskId;
String subTaskId;
int receivedCount;
}
SyncOrderTaskDomain domain = MainTaskFactory.generate(subTaskConsumerCommand);
repo.updateReceiveInfo(domain);
整理流程都收斂在 domainService 層奕扣,factory 主要職責(zé)是構(gòu)造 domain 對(duì)象薪鹦,repo 負(fù)責(zé)加載、保存、更新 domain 信息池磁。
domain 處理各類 command 指定奔害,包括任務(wù)觸發(fā)、更新任務(wù)發(fā)送記錄地熄,更新任務(wù)消費(fèi)記錄华临。
關(guān)于 domain 內(nèi)部模型,如何管理主任務(wù)端考、子任務(wù)雅潭、任務(wù)記錄,后序更新却特。