xxl-job源碼分析

簡介

XXL-JOB是一個分布式任務(wù)調(diào)度平臺宣谈,其核心設(shè)計目標(biāo)是開發(fā)迅速、學(xué)習(xí)簡單键科、輕量級闻丑、易擴(kuò)展

官方文檔:

https://www.xuxueli.com/xxl-job/

本文基于xxl-job 2.2.0版本

架構(gòu)圖

image-20210508141537828.png

quick start

1:初始化XxlJobSpringExecutor對象,設(shè)置調(diào)度中心地址及執(zhí)行器的基本屬性等

image-20210508141752193.png
image-20210508181000961.png

源碼分析

看下XxlJobSpringExecutor的類繼承圖譜

image-20210508142403321.png

可以看到實現(xiàn)了SmartInitializingSingleton勋颖,該接口只有一個方法嗦嗡,

 * Invoked right at the end of the singleton pre-instantiation phase,
 * with a guarantee that all regular singleton beans have been created
 * already. {@link ListableBeanFactory#getBeansOfType} calls within
 * this method won't trigger accidental side effects during bootstrap.
 * <p><b>NOTE:</b> This callback won't be triggered for singleton beans
 * lazily initialized on demand after {@link BeanFactory} bootstrap,
 * and not for any other bean scope either. Carefully use it for beans
 * with the intended bootstrap semantics only.
 */
void afterSingletonsInstantiated();


可以看到,在bean預(yù)初始化階段會調(diào)用當(dāng)前方法

看下XxlJobSpringExecutor的afterSingletonsInstantiated方法邏輯

@Override
public void afterSingletonsInstantiated() {

// init JobHandler Repository
/*initJobHandlerRepository(applicationContext);*/

// init JobHandler Repository (for method)
initJobHandlerMethodRepository(applicationContext);//遍歷查詢所有帶有XxlJob注解的方法的bean饭玲,校驗其合法性侥祭,注冊(存在map中);

// refresh GlueFactory
GlueFactory.refreshInstance(1);//初始化glue factory

// super start
try {
    super.start();//關(guān)鍵點在于此:調(diào)用父類的start方法
} catch (Exception e) {
    throw new RuntimeException(e);
}

}


```java
XxlJobExecutor的start方法
public void start() throws Exception {

    // init logpath
    XxlJobFileAppender.initLogPath(logPath);//創(chuàng)建log目錄

    // init invoker, admin-client
    initAdminBizList(adminAddresses, accessToken);//初始化adminBiz,


    // init JobLogFileCleanThread
    JobLogFileCleanThread.getInstance().start(logRetentionDays);//啟動定時清理日志線程

    // init TriggerCallbackThread
    TriggerCallbackThread.getInstance().start();//啟動job result 回調(diào)線程

    // init executor-server
    initEmbedServer(address, ip, port, appname, accessToken);//啟動netty,綁定地址,端口
}

進(jìn)一步看一下initEmbedServer方法,獲取當(dāng)前的ip地址,port,調(diào)用EmbedServer的start方法

public void start(final String address, final int port, final String appname, final String accessToken) {
    executorBiz = new ExecutorBizImpl();
    thread = new Thread(new Runnable() {

        @Override
        public void run() {

            // param
            EventLoopGroup bossGroup = new NioEventLoopGroup();//netty bossgroup
            EventLoopGroup workerGroup = new NioEventLoopGroup();//netty workgroup
            //定義線程池
            ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
                    0,
                    200,
                    60L,
                    TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>(2000),
                    new ThreadFactory() {
                        @Override
                        public Thread newThread(Runnable r) {
                            return new Thread(r, "xxl-rpc, EmbedServer bizThreadPool-" + r.hashCode());
                        }
                    },
                    new RejectedExecutionHandler() {
                        @Override
                        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                            throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
                        }
                    });


            try {
                // start server,bind port
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel channel) throws Exception {
                                channel.pipeline()
                                        .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle
                                        .addLast(new HttpServerCodec())
                                        .addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL
                                        .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
                            }
                        })
                        .childOption(ChannelOption.SO_KEEPALIVE, true);

                // bind
                ChannelFuture future = bootstrap.bind(port).sync();

                logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);

                // start registry
                startRegistry(appname, address);

                // wait util stop
                future.channel().closeFuture().sync();

            } catch (InterruptedException e) {
                if (e instanceof InterruptedException) {
                    logger.info(">>>>>>>>>>> xxl-job remoting server stop.");
                } else {
                    logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);
                }
            } finally {
                // stop
                try {
                    workerGroup.shutdownGracefully();
                    bossGroup.shutdownGracefully();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }

        }

    });
    thread.setDaemon(true);    // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
    thread.start();
}

主要就是定義了一個線程池,然后啟動netty server ,先看下 startRegistry(appname, address);

registryThread = new Thread(new Runnable() {
    @Override
    public void run() {

        // registry
        while (!toStop) {
            try {
                RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
                for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                    try {
                        ReturnT<String> registryResult = adminBiz.registry(registryParam);//執(zhí)行注冊邏輯,
                        if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                            registryResult = ReturnT.SUCCESS;
                            logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                            break;
                        } else {
                            logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                        }
                    } catch (Exception e) {
                        logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
                    }

                }
            } catch (Exception e) {
                if (!toStop) {
                    logger.error(e.getMessage(), e);
                }

            }

            try {
                if (!toStop) {
                    TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
                }
            } catch (InterruptedException e) {
                if (!toStop) {
                    logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());
                }
            }
        }

        // registry remove
        try {
            RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
            for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                try {
                    ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);
                    if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                        registryResult = ReturnT.SUCCESS;
                        logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                        break;
                    } else {
                        logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                    }
                } catch (Exception e) {
                    if (!toStop) {
                        logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);
                    }

                }

            }
        } catch (Exception e) {
            if (!toStop) {
                logger.error(e.getMessage(), e);
            }
        }
        logger.info(">>>>>>>>>>> xxl-job, executor registry thread destory.");

    }
});
registryThread.setDaemon(true);
registryThread.setName("xxl-job, executor ExecutorRegistryThread");
registryThread.start();

啟動注冊線程,通過rest方式調(diào)用調(diào)度中心的注冊接口(api/registry)矮冬,接下來看下調(diào)度中心的邏輯:

JobApiController.api方法
RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
return adminBiz.registry(registryParam);

其實如果存在則更新谈宛,不存在則保存,其中registryGroup對應(yīng)的是Executor,registryKey對應(yīng)的是執(zhí)行器的appName胎署,registryValue對應(yīng)則是當(dāng)前機(jī)器ip及端口

int ret = xxlJobRegistryDao.registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
if (ret < 1) {
    xxlJobRegistryDao.registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());

至此吆录,執(zhí)行器自動注冊邏輯完畢

接下來看下,調(diào)度中心如何執(zhí)行時是如何傳輸通信的?

調(diào)度中心是如何管理定時任務(wù)定時執(zhí)行的琼牧?

@Component
public class XxlJobAdminConfig implements InitializingBean, DisposableBean {

    private static XxlJobAdminConfig adminConfig = null;
    public static XxlJobAdminConfig getAdminConfig() {
        return adminConfig;
    }


    // ---------------------- XxlJobScheduler ----------------------

    private XxlJobScheduler xxlJobScheduler;

    @Override
    public void afterPropertiesSet() throws Exception {
        adminConfig = this;

        xxlJobScheduler = new XxlJobScheduler();
        xxlJobScheduler.init();
    }

初始化bean時執(zhí)行init方法

public void init() throws Exception {
    // init i18n
    initI18n();

    // admin registry monitor run
    JobRegistryMonitorHelper.getInstance().start();

    // admin fail-monitor run
    JobFailMonitorHelper.getInstance().start();

    // admin lose-monitor run
    JobLosedMonitorHelper.getInstance().start();

    // admin trigger pool start
    JobTriggerPoolHelper.toStart();

    // admin log report start
    JobLogReportHelper.getInstance().start();

    // start-schedule
    JobScheduleHelper.getInstance().start();//主要是這里:定時線程池通過jdbc加索恢筝,遍歷查詢并判定任務(wù)是否可執(zhí)行

    logger.info(">>>>>>>>> init xxl-job admin success.");
}

會把符合條件的job調(diào)用trigger觸發(fā)

JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );

一步步的跟下去,會調(diào)用到:XxlJobRemotingUtil.postBody方法,也就是向注冊的執(zhí)行器發(fā)送http請求,

請求的相關(guān)參數(shù)如下:

image-20210508170323887.png

接下來就要看執(zhí)行器netty的handler如何處理請求?

執(zhí)行的netty的handler鏈如下:->IdleStateHandler->HttpServerCodec->HttpObjectAggregator0->EmbedHttpServerHandler

依次分別是心跳檢測handler,http url編解碼障陶,http body 請求解析,業(yè)務(wù)處理handler

前三個可以忽略滋恬,不了解的可以學(xué)寫下netty相關(guān)知識,主要關(guān)注業(yè)務(wù)處理handler,

查看channelRead方法邏輯:

@Override
protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {

    // request parse
    //final byte[] requestBytes = ByteBufUtil.getBytes(msg.content());    // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8); 
    String requestData = msg.content().toString(CharsetUtil.UTF_8);//獲取到請求體參數(shù)
    String uri = msg.uri();
    HttpMethod httpMethod = msg.method();
    boolean keepAlive = HttpUtil.isKeepAlive(msg);
    String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);

    // invoke
    bizThreadPool.execute(new Runnable() {
        @Override
        public void run() {
            // do invoke
            Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);

            // to json
            String responseJson = GsonTool.toJson(responseObj);

            // write response
            writeResponse(ctx, keepAlive, responseJson);
        }
    });
}

private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {

    // valid
    if (HttpMethod.POST != httpMethod) {
        return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
    }
    if (uri==null || uri.trim().length()==0) {
        return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
    }
    if (accessToken!=null
            && accessToken.trim().length()>0
            && !accessToken.equals(accessTokenReq)) {
        return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
    }

    // services mapping
    try {
        if ("/beat".equals(uri)) {
            return executorBiz.beat();//心跳
        } else if ("/idleBeat".equals(uri)) {
            IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
            return executorBiz.idleBeat(idleBeatParam);
        } else if ("/run".equals(uri)) {//運行job
            TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
            return executorBiz.run(triggerParam);
        } else if ("/kill".equals(uri)) {
            KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
            return executorBiz.kill(killParam);
        } else if ("/log".equals(uri)) {
            LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
            return executorBiz.log(logParam);
        } else {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
        }
    } catch (Exception e) {
        logger.error(e.getMessage(), e);
        return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));
    }
}

從jobHandlerRepository取出該jobHandler抱究,放到triggerQueue中待執(zhí)行,triggerQueue也由JobThread去隊列中拉取任務(wù)恢氯,執(zhí)行execute方法,將執(zhí)行結(jié)果executeResult放到TriggerCallbackThread隊列中鼓寺,調(diào)用調(diào)度中心rest 接口:api/callback勋拟,更新調(diào)度日志

總結(jié):xxl-job雖然對于中間件來說是輕量級的(學(xué)習(xí)門檻低),但是其很多設(shè)計思路都值得我們學(xué)習(xí)借鑒妈候,涉及到的知識點也很多敢靡,netty,線程池,隊列,spring的知識點等等,個人能力有限苦银,有總結(jié)的不對的地方也請各位指出!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末啸胧,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子幔虏,更是在濱河造成了極大的恐慌纺念,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,427評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件想括,死亡現(xiàn)場離奇詭異陷谱,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)瑟蜈,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,551評論 3 395
  • 文/潘曉璐 我一進(jìn)店門烟逊,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人铺根,你說我怎么就攤上這事宪躯。” “怎么了位迂?”我有些...
    開封第一講書人閱讀 165,747評論 0 356
  • 文/不壞的土叔 我叫張陵访雪,是天一觀的道長予颤。 經(jīng)常有香客問我,道長冬阳,這世上最難降的妖魔是什么蛤虐? 我笑而不...
    開封第一講書人閱讀 58,939評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮肝陪,結(jié)果婚禮上驳庭,老公的妹妹穿的比我還像新娘。我一直安慰自己氯窍,他們只是感情好饲常,可當(dāng)我...
    茶點故事閱讀 67,955評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著狼讨,像睡著了一般贝淤。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上政供,一...
    開封第一講書人閱讀 51,737評論 1 305
  • 那天播聪,我揣著相機(jī)與錄音,去河邊找鬼布隔。 笑死离陶,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的衅檀。 我是一名探鬼主播招刨,決...
    沈念sama閱讀 40,448評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼哀军!你這毒婦竟也來了沉眶?” 一聲冷哼從身側(cè)響起杉适,我...
    開封第一講書人閱讀 39,352評論 0 276
  • 序言:老撾萬榮一對情侶失蹤谎倔,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后淘衙,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體传藏,經(jīng)...
    沈念sama閱讀 45,834評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡腻暮,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,992評論 3 338
  • 正文 我和宋清朗相戀三年彤守,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片哭靖。...
    茶點故事閱讀 40,133評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡具垫,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出试幽,到底是詐尸還是另有隱情筝蚕,我是刑警寧澤,帶...
    沈念sama閱讀 35,815評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站起宽,受9級特大地震影響洲胖,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜坯沪,卻給世界環(huán)境...
    茶點故事閱讀 41,477評論 3 331
  • 文/蒙蒙 一绿映、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧腐晾,春花似錦叉弦、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,022評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至巨柒,卻和暖如春樱拴,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背洋满。 一陣腳步聲響...
    開封第一講書人閱讀 33,147評論 1 272
  • 我被黑心中介騙來泰國打工疹鳄, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人芦岂。 一個月前我還...
    沈念sama閱讀 48,398評論 3 373
  • 正文 我出身青樓瘪弓,卻偏偏與公主長得像,于是被迫代替她去往敵國和親禽最。 傳聞我的和親對象是個殘疾皇子腺怯,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,077評論 2 355

推薦閱讀更多精彩內(nèi)容