簡介
XXL-JOB是一個分布式任務(wù)調(diào)度平臺宣谈,其核心設(shè)計目標(biāo)是開發(fā)迅速、學(xué)習(xí)簡單键科、輕量級闻丑、易擴(kuò)展
官方文檔:
https://www.xuxueli.com/xxl-job/
本文基于xxl-job 2.2.0版本
架構(gòu)圖
quick start
1:初始化XxlJobSpringExecutor對象,設(shè)置調(diào)度中心地址及執(zhí)行器的基本屬性等
源碼分析
看下XxlJobSpringExecutor的類繼承圖譜
可以看到實現(xiàn)了SmartInitializingSingleton勋颖,該接口只有一個方法嗦嗡,
* Invoked right at the end of the singleton pre-instantiation phase,
* with a guarantee that all regular singleton beans have been created
* already. {@link ListableBeanFactory#getBeansOfType} calls within
* this method won't trigger accidental side effects during bootstrap.
* <p><b>NOTE:</b> This callback won't be triggered for singleton beans
* lazily initialized on demand after {@link BeanFactory} bootstrap,
* and not for any other bean scope either. Carefully use it for beans
* with the intended bootstrap semantics only.
*/
void afterSingletonsInstantiated();
可以看到,在bean預(yù)初始化階段會調(diào)用當(dāng)前方法
看下XxlJobSpringExecutor的afterSingletonsInstantiated方法邏輯
@Override
public void afterSingletonsInstantiated() {
// init JobHandler Repository
/*initJobHandlerRepository(applicationContext);*/
// init JobHandler Repository (for method)
initJobHandlerMethodRepository(applicationContext);//遍歷查詢所有帶有XxlJob注解的方法的bean饭玲,校驗其合法性侥祭,注冊(存在map中);
// refresh GlueFactory
GlueFactory.refreshInstance(1);//初始化glue factory
// super start
try {
super.start();//關(guān)鍵點在于此:調(diào)用父類的start方法
} catch (Exception e) {
throw new RuntimeException(e);
}
}
```java
XxlJobExecutor的start方法
public void start() throws Exception {
// init logpath
XxlJobFileAppender.initLogPath(logPath);//創(chuàng)建log目錄
// init invoker, admin-client
initAdminBizList(adminAddresses, accessToken);//初始化adminBiz,
// init JobLogFileCleanThread
JobLogFileCleanThread.getInstance().start(logRetentionDays);//啟動定時清理日志線程
// init TriggerCallbackThread
TriggerCallbackThread.getInstance().start();//啟動job result 回調(diào)線程
// init executor-server
initEmbedServer(address, ip, port, appname, accessToken);//啟動netty,綁定地址,端口
}
進(jìn)一步看一下initEmbedServer方法,獲取當(dāng)前的ip地址,port,調(diào)用EmbedServer的start方法
public void start(final String address, final int port, final String appname, final String accessToken) {
executorBiz = new ExecutorBizImpl();
thread = new Thread(new Runnable() {
@Override
public void run() {
// param
EventLoopGroup bossGroup = new NioEventLoopGroup();//netty bossgroup
EventLoopGroup workerGroup = new NioEventLoopGroup();//netty workgroup
//定義線程池
ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
0,
200,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-rpc, EmbedServer bizThreadPool-" + r.hashCode());
}
},
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
}
});
try {
// start server,bind port
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL
.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true);
// bind
ChannelFuture future = bootstrap.bind(port).sync();
logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);
// start registry
startRegistry(appname, address);
// wait util stop
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
if (e instanceof InterruptedException) {
logger.info(">>>>>>>>>>> xxl-job remoting server stop.");
} else {
logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);
}
} finally {
// stop
try {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
});
thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
thread.start();
}
主要就是定義了一個線程池,然后啟動netty server ,先看下 startRegistry(appname, address);
registryThread = new Thread(new Runnable() {
@Override
public void run() {
// registry
while (!toStop) {
try {
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {
ReturnT<String> registryResult = adminBiz.registry(registryParam);//執(zhí)行注冊邏輯,
if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
registryResult = ReturnT.SUCCESS;
logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
break;
} else {
logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
}
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
try {
if (!toStop) {
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
}
} catch (InterruptedException e) {
if (!toStop) {
logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());
}
}
}
// registry remove
try {
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {
ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);
if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
registryResult = ReturnT.SUCCESS;
logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
break;
} else {
logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
}
} catch (Exception e) {
if (!toStop) {
logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);
}
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
logger.info(">>>>>>>>>>> xxl-job, executor registry thread destory.");
}
});
registryThread.setDaemon(true);
registryThread.setName("xxl-job, executor ExecutorRegistryThread");
registryThread.start();
啟動注冊線程,通過rest方式調(diào)用調(diào)度中心的注冊接口(api/registry)矮冬,接下來看下調(diào)度中心的邏輯:
JobApiController.api方法
RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
return adminBiz.registry(registryParam);
其實如果存在則更新谈宛,不存在則保存,其中registryGroup對應(yīng)的是Executor,registryKey對應(yīng)的是執(zhí)行器的appName胎署,registryValue對應(yīng)則是當(dāng)前機(jī)器ip及端口
int ret = xxlJobRegistryDao.registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
if (ret < 1) {
xxlJobRegistryDao.registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
至此吆录,執(zhí)行器自動注冊邏輯完畢
接下來看下,調(diào)度中心如何執(zhí)行時是如何傳輸通信的?
調(diào)度中心是如何管理定時任務(wù)定時執(zhí)行的琼牧?
@Component
public class XxlJobAdminConfig implements InitializingBean, DisposableBean {
private static XxlJobAdminConfig adminConfig = null;
public static XxlJobAdminConfig getAdminConfig() {
return adminConfig;
}
// ---------------------- XxlJobScheduler ----------------------
private XxlJobScheduler xxlJobScheduler;
@Override
public void afterPropertiesSet() throws Exception {
adminConfig = this;
xxlJobScheduler = new XxlJobScheduler();
xxlJobScheduler.init();
}
初始化bean時執(zhí)行init方法
public void init() throws Exception {
// init i18n
initI18n();
// admin registry monitor run
JobRegistryMonitorHelper.getInstance().start();
// admin fail-monitor run
JobFailMonitorHelper.getInstance().start();
// admin lose-monitor run
JobLosedMonitorHelper.getInstance().start();
// admin trigger pool start
JobTriggerPoolHelper.toStart();
// admin log report start
JobLogReportHelper.getInstance().start();
// start-schedule
JobScheduleHelper.getInstance().start();//主要是這里:定時線程池通過jdbc加索恢筝,遍歷查詢并判定任務(wù)是否可執(zhí)行
logger.info(">>>>>>>>> init xxl-job admin success.");
}
會把符合條件的job調(diào)用trigger觸發(fā)
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
一步步的跟下去,會調(diào)用到:XxlJobRemotingUtil.postBody方法,也就是向注冊的執(zhí)行器發(fā)送http請求,
請求的相關(guān)參數(shù)如下:
接下來就要看執(zhí)行器netty的handler如何處理請求?
執(zhí)行的netty的handler鏈如下:->IdleStateHandler->HttpServerCodec->HttpObjectAggregator0->EmbedHttpServerHandler
依次分別是心跳檢測handler,http url編解碼障陶,http body 請求解析,業(yè)務(wù)處理handler
前三個可以忽略滋恬,不了解的可以學(xué)寫下netty相關(guān)知識,主要關(guān)注業(yè)務(wù)處理handler,
查看channelRead方法邏輯:
@Override
protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
// request parse
//final byte[] requestBytes = ByteBufUtil.getBytes(msg.content()); // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8);
String requestData = msg.content().toString(CharsetUtil.UTF_8);//獲取到請求體參數(shù)
String uri = msg.uri();
HttpMethod httpMethod = msg.method();
boolean keepAlive = HttpUtil.isKeepAlive(msg);
String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);
// invoke
bizThreadPool.execute(new Runnable() {
@Override
public void run() {
// do invoke
Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);
// to json
String responseJson = GsonTool.toJson(responseObj);
// write response
writeResponse(ctx, keepAlive, responseJson);
}
});
}
private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
// valid
if (HttpMethod.POST != httpMethod) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
}
if (uri==null || uri.trim().length()==0) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
}
if (accessToken!=null
&& accessToken.trim().length()>0
&& !accessToken.equals(accessTokenReq)) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
}
// services mapping
try {
if ("/beat".equals(uri)) {
return executorBiz.beat();//心跳
} else if ("/idleBeat".equals(uri)) {
IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
return executorBiz.idleBeat(idleBeatParam);
} else if ("/run".equals(uri)) {//運行job
TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
return executorBiz.run(triggerParam);
} else if ("/kill".equals(uri)) {
KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
return executorBiz.kill(killParam);
} else if ("/log".equals(uri)) {
LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
return executorBiz.log(logParam);
} else {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));
}
}
從jobHandlerRepository取出該jobHandler抱究,放到triggerQueue中待執(zhí)行,triggerQueue也由JobThread去隊列中拉取任務(wù)恢氯,執(zhí)行execute方法,將執(zhí)行結(jié)果executeResult放到TriggerCallbackThread隊列中鼓寺,調(diào)用調(diào)度中心rest 接口:api/callback勋拟,更新調(diào)度日志
總結(jié):xxl-job雖然對于中間件來說是輕量級的(學(xué)習(xí)門檻低),但是其很多設(shè)計思路都值得我們學(xué)習(xí)借鑒妈候,涉及到的知識點也很多敢靡,netty,線程池,隊列,spring的知識點等等,個人能力有限苦银,有總結(jié)的不對的地方也請各位指出!