一晋南,前言
XXL-JOB是一個(gè)優(yōu)秀的國(guó)產(chǎn)開源分布式任務(wù)調(diào)度平臺(tái)妆丘,他有著自己的一套調(diào)度注冊(cè)中心,提供了豐富的調(diào)度和阻塞策略等暇务,這些都是可視化的操作劣针,使用起來(lái)十分方便。
由于是國(guó)產(chǎn)的押搪,所以上手還是比較快的树酪,而且他的源碼也十分優(yōu)秀浅碾,因?yàn)槭钦{(diào)試平臺(tái)所以線程這一塊的使用是很頻繁的,特別值得學(xué)習(xí)研究续语。
XXL-JOB一同分為兩個(gè)模塊垂谢,調(diào)度中心模塊和執(zhí)行模塊。具體解釋疮茄,我們copy下官網(wǎng)的介紹:
調(diào)度模塊(調(diào)度中心):
負(fù)責(zé)管理調(diào)度信息滥朱,按照調(diào)度配置發(fā)出調(diào)度請(qǐng)求,自身不承擔(dān)業(yè)務(wù)代碼娃豹。調(diào)度系統(tǒng)與任務(wù)解耦焚虱,提高了系統(tǒng)可用性和穩(wěn)定性,同時(shí)調(diào)度系統(tǒng)性能不再受限于任務(wù)模塊懂版;
支持可視化鹃栽、簡(jiǎn)單且動(dòng)態(tài)的管理調(diào)度信息,包括任務(wù)新建躯畴,更新民鼓,刪除,GLUE開發(fā)和任務(wù)報(bào)警等蓬抄,所有上述操作都會(huì)實(shí)時(shí)生效丰嘉,同時(shí)支持監(jiān)控調(diào)度結(jié)果以及執(zhí)行日志,支持執(zhí)行器Failover嚷缭。執(zhí)行模塊(執(zhí)行器):
負(fù)責(zé)接收調(diào)度請(qǐng)求并執(zhí)行任務(wù)邏輯饮亏。任務(wù)模塊專注于任務(wù)的執(zhí)行等操作,開發(fā)和維護(hù)更加簡(jiǎn)單和高效阅爽;
接收“調(diào)度中心”的執(zhí)行請(qǐng)求路幸、終止請(qǐng)求和日志請(qǐng)求等。
XXL-JOB中“調(diào)度模塊”和“任務(wù)模塊”完全解耦付翁,調(diào)度模塊進(jìn)行任務(wù)調(diào)度時(shí)简肴,將會(huì)解析不同的任務(wù)參數(shù)發(fā)起遠(yuǎn)程調(diào)用,調(diào)用各自的遠(yuǎn)程執(zhí)行器服務(wù)百侧。這種調(diào)用模型類似RPC調(diào)用砰识,調(diào)度中心提供調(diào)用代理的功能,而執(zhí)行器提供遠(yuǎn)程服務(wù)的功能佣渴。
下面看下springboot環(huán)境下的使用方式辫狼,首先看下執(zhí)行器的配置:
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
//調(diào)度中心地址
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
//執(zhí)行器AppName
xxlJobSpringExecutor.setAppname(appname);
//執(zhí)行器注冊(cè)地址,默認(rèn)為空即可
xxlJobSpringExecutor.setAddress(address);
//執(zhí)行器IP [選填]:默認(rèn)為空表示自動(dòng)獲取IP
xxlJobSpringExecutor.setIp(ip);
//執(zhí)行器端口
xxlJobSpringExecutor.setPort(port);
//執(zhí)行器通訊TOKEN
xxlJobSpringExecutor.setAccessToken(accessToken);
//執(zhí)行器運(yùn)行日志文件存儲(chǔ)磁盤路徑
xxlJobSpringExecutor.setLogPath(logPath);
//執(zhí)行器日志文件保存天數(shù)
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
XXL-JOB提供了多種任務(wù)執(zhí)行方式观话,我們今天看下最簡(jiǎn)單的bean執(zhí)行模式予借。如下:
/**
* 1、簡(jiǎn)單任務(wù)示例(Bean模式)
*/
@XxlJob("demoJobHandler")
public void demoJobHandler() throws Exception {
XxlJobHelper.log("XXL-JOB, Hello World.");
for (int i = 0; i < 5; i++) {
XxlJobHelper.log("beat at:" + i);
TimeUnit.SECONDS.sleep(2);
}
// default success
}
現(xiàn)在在調(diào)度中心稍做配置,我們這段代碼就可以按照一定的策略進(jìn)行調(diào)度執(zhí)行灵迫,是不是很神奇秦叛?我們先看下官網(wǎng)上的解釋:
原理:每個(gè)Bean模式任務(wù)都是一個(gè)Spring的Bean類實(shí)例,它被維護(hù)在“執(zhí)行器”項(xiàng)目的Spring容器中瀑粥。任務(wù)類需要加“@JobHandler(value=”名稱”)”注解挣跋,因?yàn)椤皥?zhí)行器”會(huì)根據(jù)該注解識(shí)別Spring容器中的任務(wù)。任務(wù)類需要繼承統(tǒng)一接口“IJobHandler”狞换,任務(wù)邏輯在execute方法中開發(fā)避咆,因?yàn)椤皥?zhí)行器”在接收到調(diào)度中心的調(diào)度請(qǐng)求時(shí),將會(huì)調(diào)用“IJobHandler”的execute方法修噪,執(zhí)行任務(wù)邏輯查库。
紙上得來(lái)終覺淺,絕知此事要躬行,今天的任務(wù)就是跟著這段話黄琼,我們大體看一波源碼的實(shí)現(xiàn)方式樊销。
二,XxlJobSpringExecutor
XxlJobSpringExecutor其實(shí)看名字脏款,我們都能想到围苫,這是XXL-JOB為了適應(yīng)spring模式的應(yīng)用而開發(fā)的模板類,先看下他的實(shí)現(xiàn)結(jié)構(gòu)撤师。
XxlJobSpringExecutor繼承自XxlJobExecutor
剂府,同時(shí)由于是用在spring環(huán)境,所以實(shí)現(xiàn)了多個(gè)spring內(nèi)置的接口來(lái)配合實(shí)現(xiàn)整個(gè)執(zhí)行器模塊功能剃盾,每個(gè)接口的功能就不細(xì)說(shuō)了腺占,相信大家都可以百度查到。
我們看下初始化方法afterSingletonsInstantiated
:
// start
@Override
public void afterSingletonsInstantiated() {
//注冊(cè)每個(gè)任務(wù)
initJobHandlerMethodRepository(applicationContext);
// refresh GlueFactory
GlueFactory.refreshInstance(1);
// super start
try {
super.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
主流程看上去是比較簡(jiǎn)單的痒谴,首先是注冊(cè)每一個(gè)JobHandler,然后進(jìn)行初始化操作湾笛,GlueFactory.refreshInstance(1)
是為了另一種調(diào)用模式時(shí)用到的,主要是用到了groovy
闰歪,不在這次的分析中,我們就不看了蓖墅。我們繼續(xù)看下如何注冊(cè)JobHandler的库倘。
private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
if (applicationContext == null) {
return;
}
// 遍歷所有beans,取出所有包含有@XxlJob的方法
String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
for (String beanDefinitionName : beanDefinitionNames) {
Object bean = applicationContext.getBean(beanDefinitionName);
Map<Method, XxlJob> annotatedMethods = null; // referred to :org.springframework.context.event.EventListenerMethodProcessor.processBean
try {
annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
new MethodIntrospector.MetadataLookup<XxlJob>() {
@Override
public XxlJob inspect(Method method) {
return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
}
});
} catch (Throwable ex) {
logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
}
if (annotatedMethods==null || annotatedMethods.isEmpty()) {
continue;
}
//遍歷@XxlJob方法论矾,取出executeMethod以及注解中對(duì)應(yīng)的initMethod, destroyMethod進(jìn)行注冊(cè)
for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
Method executeMethod = methodXxlJobEntry.getKey();
XxlJob xxlJob = methodXxlJobEntry.getValue();
if (xxlJob == null) {
continue;
}
String name = xxlJob.value();
if (name.trim().length() == 0) {
throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] .");
}
if (loadJobHandler(name) != null) {
throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
}
executeMethod.setAccessible(true);
// init and destory
Method initMethod = null;
Method destroyMethod = null;
if (xxlJob.init().trim().length() > 0) {
try {
initMethod = bean.getClass().getDeclaredMethod(xxlJob.init());
initMethod.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] .");
}
}
if (xxlJob.destroy().trim().length() > 0) {
try {
destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy());
destroyMethod.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] .");
}
}
// 注冊(cè) jobhandler
registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));
}
}
}
XxlJobSpringExecutor由于實(shí)現(xiàn)了ApplicationContextAware
教翩,所以通過(guò)applicationContext可以獲得所有容器中的bean實(shí)例,再通過(guò)MethodIntrospector來(lái)過(guò)濾出所有包含@XxlJob注解的方法贪壳,最后把對(duì)應(yīng)的executeMethod以及注解中對(duì)應(yīng)的initMethod, destroyMethod進(jìn)行注冊(cè)到jobHandlerRepository
中饱亿,jobHandlerRepository
是一個(gè)線程安全ConcurrentMap,MethodJobHandler實(shí)現(xiàn)自IJobHandler
接口的一個(gè)模板類,主要作用就是通過(guò)反射去執(zhí)行對(duì)應(yīng)的方法彪笼∽曜ⅲ看到這,之前那句話任務(wù)類需要加“@JobHandler(value=”名稱”)”注解配猫,因?yàn)椤皥?zhí)行器”會(huì)根據(jù)該注解識(shí)別Spring容器中的任務(wù)幅恋。我們就明白了。
public class MethodJobHandler extends IJobHandler {
....
public MethodJobHandler(Object target, Method method, Method initMethod, Method destroyMethod) {
this.target = target;
this.method = method;
this.initMethod = initMethod;
this.destroyMethod = destroyMethod;
}
@Override
public void execute() throws Exception {
Class<?>[] paramTypes = method.getParameterTypes();
if (paramTypes.length > 0) {
method.invoke(target, new Object[paramTypes.length]); // method-param can not be primitive-types
} else {
method.invoke(target);
}
}
三泵肄,執(zhí)行服務(wù)器initEmbedServer
看完上面的JobHandler注冊(cè)捆交,后面緊著就是執(zhí)行器模塊的啟動(dòng)操作了,下面看下start方法:
public void start() throws Exception {
// 初始化日志path
XxlJobFileAppender.initLogPath(logPath);
// 注冊(cè)adminBizList
initAdminBizList(adminAddresses, accessToken);
// 初始化日志清除線程
JobLogFileCleanThread.getInstance().start(logRetentionDays);
// 初始化回調(diào)線程腐巢,用來(lái)把執(zhí)行結(jié)果回調(diào)給調(diào)度中心
TriggerCallbackThread.getInstance().start();
// 執(zhí)行服務(wù)器啟動(dòng)
initEmbedServer(address, ip, port, appname, accessToken);
}
前幾個(gè)操作品追,我們就不細(xì)看了,大家有興趣的可以自行查看冯丙,我們直接進(jìn)入initEmbedServer方法查看內(nèi)部服務(wù)器如何啟動(dòng)肉瓦,以及向調(diào)試中心注冊(cè)的。
private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {
...
// start
embedServer = new EmbedServer();
embedServer.start(address, port, appname, accessToken);
}
public void start(final String address, final int port, final String appname, final String accessToken) {
```
// 啟動(dòng)netty服務(wù)器
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL
.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true);
// bind
ChannelFuture future = bootstrap.bind(port).sync();
logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);
// 執(zhí)行向調(diào)度中心注冊(cè)
startRegistry(appname, address);
```
}
因?yàn)閳?zhí)行器模塊本身需要有通訊交互的需求银还,不然調(diào)度中心是無(wú)法調(diào)用他的风宁,所以內(nèi)嵌了一個(gè)netty服務(wù)器進(jìn)行通信。啟動(dòng)成功后蛹疯,正式向調(diào)試中心執(zhí)行注冊(cè)請(qǐng)求戒财。我們直接看注冊(cè)的代碼:
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {
//執(zhí)行注冊(cè)請(qǐng)求
ReturnT<String> registryResult = adminBiz.registry(registryParam);
if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
registryResult = ReturnT.SUCCESS;
logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
break;
} else {
logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
}
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
}
}
@Override
public ReturnT<String> registry(RegistryParam registryParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
}
XxlJobRemotingUtil.postBody
就是個(gè)符合XXL-JOB規(guī)范的restful的http請(qǐng)求處理,里面不止有注冊(cè)請(qǐng)求捺弦,還有下線請(qǐng)求饮寞,回調(diào)請(qǐng)求等,礙于篇幅列吼,就不一一展示了幽崩,調(diào)度中心接到對(duì)應(yīng)的請(qǐng)求,會(huì)有對(duì)應(yīng)的DB處理:
// services mapping
if ("callback".equals(uri)) {
List<HandleCallbackParam> callbackParamList = GsonTool.fromJson(data, List.class, HandleCallbackParam.class);
return adminBiz.callback(callbackParamList);
} else if ("registry".equals(uri)) {
RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
return adminBiz.registry(registryParam);
} else if ("registryRemove".equals(uri)) {
RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
return adminBiz.registryRemove(registryParam);
} else {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
}
跟到這里寞钥,我們就已經(jīng)大概了解了整個(gè)注冊(cè)的流程慌申。同樣當(dāng)調(diào)度中心向我們執(zhí)行器發(fā)送請(qǐng)求,譬如說(shuō)執(zhí)行任務(wù)調(diào)度的請(qǐng)求時(shí)理郑,也是同樣的http請(qǐng)求發(fā)送我們上面分析的執(zhí)行器中內(nèi)嵌netty服務(wù)進(jìn)行操作蹄溉,這邊只展示調(diào)用方法:
@Override
public ReturnT<String> run(TriggerParam triggerParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
}
這樣,我們執(zhí)行器模塊收到請(qǐng)求后會(huì)執(zhí)行我們上面注冊(cè)中的jobHandle進(jìn)行對(duì)應(yīng)的方法執(zhí)行您炉,執(zhí)行器會(huì)將請(qǐng)求存入“異步執(zhí)行隊(duì)列”并且立即響應(yīng)調(diào)度中心柒爵,異步運(yùn)行對(duì)應(yīng)方法。這樣一套注冊(cè)和執(zhí)行的流程就大致走下來(lái)了赚爵。
四棉胀,結(jié)尾
當(dāng)然事實(shí)上XXL-JOB的代碼還有許多豐富的特性法瑟,礙于本人實(shí)力不能一一道明,我這也是拋轉(zhuǎn)引玉唁奢,只是把最基礎(chǔ)的一些地方介紹給大家霎挟,有興趣的話,大家可以自行查閱相關(guān)代碼驮瞧,總的來(lái)說(shuō)氓扛,畢竟是國(guó)產(chǎn)開源的優(yōu)秀項(xiàng)目,還是值得贊賞的论笔,也希望國(guó)內(nèi)以后有越來(lái)越多優(yōu)秀開源框架采郎。