XX-JOB閱讀筆記(一):執(zhí)行器注冊(cè)到管理器實(shí)現(xiàn)方式

這里不再對(duì)開源框架XX-JOB做介紹忽肛,單純介紹部分功能實(shí)現(xiàn)原理。本篇記錄執(zhí)行器Executor如何注冊(cè)到任務(wù)管理器Admin留晚。
本系列文章基于V2.1.0版本介紹客冈,附github上架構(gòu)圖。

image.png

Executor將本執(zhí)行器的ip地址及端口號(hào)注冊(cè)到Admin赏殃,實(shí)際是保存在數(shù)據(jù)庫(kù)表xxl_job_registry中敷待,保存后地址如:
image.png

然后Admin在根據(jù)不同策略獲取這些地址。

整體大概流程

image.png

工程目錄

image.png

以springboot為例

初始化配置文件XxlJobConfig.java -->創(chuàng)建XxlJobSpringExecutor.java(set 執(zhí)行器和管理器各種信息) bean -->指定init和destory方法 --> XxlJobSpringExecutor執(zhí)行start()

//從application.properties文件中讀取admin和executor信息仁热,并初始化到XxlJobSpringExecutor類中榜揖,指定init和destory方法
@Bean(initMethod = "start", destroyMethod = "destroy")
    public XxlJobSpringExecutor xxlJobExecutor() {
        logger.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        //設(shè)置admin地址,eg:http://127.0.0.1:8080/xxl-job-admin
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        //設(shè)置執(zhí)行器名稱抗蠢,eg:xxl-job-executor-sample
        xxlJobSpringExecutor.setAppName(appName);
        //設(shè)置執(zhí)行器ip和port
        xxlJobSpringExecutor.setIp(ip);       
        xxlJobSpringExecutor.setPort(port);
         //設(shè)置執(zhí)行器訪問口令
        xxlJobSpringExecutor.setAccessToken(accessToken);
       //設(shè)置日志保存路徑
        xxlJobSpringExecutor.setLogPath(logPath);
         //設(shè)置日志保存天數(shù)
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

        return xxlJobSpringExecutor;
    }

接著XxlJobSpringExecutor執(zhí)行start方法

public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware {
    @Override
    public void start() throws Exception {
        // 初始化執(zhí)行器上面的任務(wù)
        initJobHandlerRepository(applicationContext);
        // refresh GlueFactory
        GlueFactory.refreshInstance(1);
        // super start
        super.start();
    }

接著父類XxlJobExecutor 執(zhí)行start方法

// ---------------------- start + stop ----------------------
    public void start() throws Exception {
        //設(shè)置日志路徑
        // init logpath
        XxlJobFileAppender.initLogPath(logPath);
        //設(shè)置admin地址及執(zhí)行器訪問口令
        // init invoker, admin-client
        initAdminBizList(adminAddresses, accessToken);
        //設(shè)置日志清理線程參數(shù)
        // init JobLogFileCleanThread
        JobLogFileCleanThread.getInstance().start(logRetentionDays);
        //任務(wù)執(zhí)行結(jié)果回調(diào)線程(包含回調(diào)失敗后重試機(jī)制)
        // init TriggerCallbackThread
        TriggerCallbackThread.getInstance().start();
        // init executor-server
        //設(shè)置執(zhí)行器ip和port
        port = port>0?port: NetUtil.findAvailablePort(9999);
        ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
        //注冊(cè)執(zhí)行器
        initRpcProvider(ip, port, appName, accessToken);
    }

接著看上面的initAdminBizList(adminAddresses, accessToken)方法举哟,這一步是初始化Admin的值,以及初始化執(zhí)行器訪問口令迅矛,下面看具體執(zhí)行邏輯

//初始化各種rpc的各種協(xié)議
private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
        serializer = Serializer.SerializeEnum.HESSIAN.getSerializer();
        if (adminAddresses!=null && adminAddresses.trim().length()>0) {
            for (String address: adminAddresses.trim().split(",")) {
                if (address!=null && address.trim().length()>0) {
                    String addressUrl = address.concat(AdminBiz.MAPPING);
                    // 這里的getObject() 返回的是一個(gè)動(dòng)態(tài)代理對(duì)象妨猩,代理對(duì)象在使用方法時(shí),并不是真實(shí)的自己調(diào)用秽褒,而是委托尤其關(guān)聯(lián)到的hander對(duì)象的invoke方法來調(diào)用
                    AdminBiz adminBiz = (AdminBiz) new XxlRpcReferenceBean(
                            NetEnum.NETTY_HTTP,
                            serializer,
                            CallType.SYNC,
                            LoadBalance.ROUND,
                            AdminBiz.class,
                            null,
                            3000,
                            addressUrl,
                            accessToken,
                            null,
                            null
                    ).getObject();//getObject方法比較重要
                    if (adminBizList == null) {
                        adminBizList = new ArrayList<AdminBiz>();
                    }
                    adminBizList.add(adminBiz);
                }
            }
        }
    }
public Object getObject() {
                //使用動(dòng)態(tài)代理壶硅,通過此方法發(fā)送請(qǐng)求到Admin的/api接口,api接口收到請(qǐng)求后销斟,解析出具體的方法和參數(shù)庐椒,獲取到對(duì)應(yīng)的Bean,通過反射執(zhí)行具體的方法蚂踊,最終實(shí)現(xiàn)調(diào)用AdminBizImpl.registry()
        return Proxy.newProxyInstance(Thread.currentThread()
                .getContextClassLoader(), new Class[] { iface },
                new InvocationHandler() {
                    @Override
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

接著看initRpcProvider(ip, port, appName, accessToken)方法:

private void initRpcProvider(String ip, int port, String appName, String accessToken) throws Exception {
        // init, provider factory
        String address = IpUtil.getIpPort(ip, port);
        Map<String, String> serviceRegistryParam = new HashMap<String, String>();
        serviceRegistryParam.put("appName", appName);
        serviceRegistryParam.put("address", address);
        xxlRpcProviderFactory = new XxlRpcProviderFactory();
        //指定執(zhí)行器注冊(cè)類為ExecutorServiceRegistry
        xxlRpcProviderFactory.initConfig(NetEnum.NETTY_HTTP, Serializer.SerializeEnum.HESSIAN.getSerializer(), ip, port, accessToken, ExecutorServiceRegistry.class, serviceRegistryParam);
        // add services
        xxlRpcProviderFactory.addService(ExecutorBiz.class.getName(), null, new ExecutorBizImpl());
        //啟動(dòng)執(zhí)行器注冊(cè)工廠
        // start
        xxlRpcProviderFactory.start();

    }

接著是 xxlRpcProviderFactory.start()方法

public void start() throws Exception {
        // start server
        serviceAddress = IpUtil.getIpPort(this.ip, port);
        server = netType.serverClass.newInstance();
        server.setStartedCallback(new BaseCallback() {      // serviceRegistry started
            @Override
            public void run() throws Exception {
                // start registry
                if (serviceRegistryClass != null) {
                    serviceRegistry = serviceRegistryClass.newInstance();
                                        //執(zhí)行器注冊(cè)類啟動(dòng)
                    serviceRegistry.start(serviceRegistryParam);
                    if (serviceData.size() > 0) {
                        serviceRegistry.registry(serviceData.keySet(), serviceAddress);
                    }
                }
            }
        });
        server.setStopedCallback(new BaseCallback() {       // serviceRegistry stoped
            @Override
            public void run() {
                // stop registry
                if (serviceRegistry != null) {
                    if (serviceData.size() > 0) {
                        serviceRegistry.remove(serviceData.keySet(), serviceAddress);
                    }
                    serviceRegistry.stop();
                    serviceRegistry = null;
                }
            }
        });
        server.start(this);
    }

回到ExecutorServiceRegistry的start方法约谈,

public static class ExecutorServiceRegistry extends ServiceRegistry {
        @Override
        public void start(Map<String, String> param) {
            //此處進(jìn)行注冊(cè)
            // start registry
            ExecutorRegistryThread.getInstance().start(param.get("appName"), param.get("address"));
        }
        @Override
        public void stop() {
            // stop registry
            ExecutorRegistryThread.getInstance().toStop();
        }
        @Override
        public boolean registry(Set<String> keys, String value) {
            return false;
        }
        @Override
        public boolean remove(Set<String> keys, String value) {
            return false;
        }
        @Override
        public Map<String, TreeSet<String>> discovery(Set<String> keys) {
            return null;
        }
        @Override
        public TreeSet<String> discovery(String key) {
            return null;
        }
    }

進(jìn)入ExecutorRegistryThread.start()方法,

 registryThread = new Thread(new Runnable() {
            @Override
            public void run() {
                // registry
                while (!toStop) {
                    try {
                        RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, address);
                        for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                            try {//此處注冊(cè)
                                ReturnT<String> registryResult = adminBiz.registry(registryParam);
                                if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                                    registryResult = ReturnT.SUCCESS;
                                    logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                                    break;
                                } else {
                                    logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                                }
                            } catch (Exception e) {
                                logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
                            }

                        }
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }

                    }

Admin中接受Executor請(qǐng)求的入口

/**
 * Executor調(diào)用Admin入口,接受到請(qǐng)求后棱诱,在進(jìn)行反思操作泼橘,實(shí)現(xiàn)調(diào)用具體方法
 * Created by xuxueli on 17/5/10.
 */
@Controller
public class JobApiController implements InitializingBean {
    @Override
    public void afterPropertiesSet() throws Exception {
    }
    //執(zhí)行器調(diào)用管理器方法入口
    @RequestMapping(AdminBiz.MAPPING)
    @PermissionLimit(limit=false)
    public void api(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {        
        XxlJobScheduler.invokeAdminService(request, response);
    }
}

XxlJobScheduler.invokeAdminService(request, response) ->servletServerHandler.handle(null, request, response)->xxlRpcProviderFactory.invokeService(xxlRpcRequest)

public XxlRpcResponse invokeService(XxlRpcRequest xxlRpcRequest) {
        //  make response
        XxlRpcResponse xxlRpcResponse = new XxlRpcResponse();
        xxlRpcResponse.setRequestId(xxlRpcRequest.getRequestId());

        // match service bean 獲取匹配的Bean
        String serviceKey = makeServiceKey(xxlRpcRequest.getClassName(), xxlRpcRequest.getVersion());
        Object serviceBean = serviceData.get(serviceKey);

        // valid
        if (serviceBean == null) {
            xxlRpcResponse.setErrorMsg("The serviceKey["+ serviceKey +"] not found.");
            return xxlRpcResponse;
        }

        if (System.currentTimeMillis() - xxlRpcRequest.getCreateMillisTime() > 3*60*1000) {
            xxlRpcResponse.setErrorMsg("The timestamp difference between admin and executor exceeds the limit.");
            return xxlRpcResponse;
        }
        if (accessToken!=null && accessToken.trim().length()>0 && !accessToken.trim().equals(xxlRpcRequest.getAccessToken())) {
            xxlRpcResponse.setErrorMsg("The access token[" + xxlRpcRequest.getAccessToken() + "] is wrong.");
            return xxlRpcResponse;
        }

        try {
            // invoke
            Class<?> serviceClass = serviceBean.getClass();
            String methodName = xxlRpcRequest.getMethodName();
            Class<?>[] parameterTypes = xxlRpcRequest.getParameterTypes();
            Object[] parameters = xxlRpcRequest.getParameters();

            Method method = serviceClass.getMethod(methodName, parameterTypes);
            method.setAccessible(true);
            //反射調(diào)用具體方法
            Object result = method.invoke(serviceBean, parameters);

            /*FastClass serviceFastClass = FastClass.create(serviceClass);
            FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
            Object result = serviceFastMethod.invoke(serviceBean, parameters);*/

            xxlRpcResponse.setResult(result);
        } catch (Throwable t) {
            // catch error
            logger.error("xxl-rpc provider invokeService error.", t);
            xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(t));
        }

        return xxlRpcResponse;
    }

Admin中實(shí)現(xiàn)類

@Override
    public ReturnT<String> registry(RegistryParam registryParam) {
        //注冊(cè)信息入庫(kù)
        int ret = xxlJobRegistryDao.registryUpdate(registryParam.getRegistGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue());
        if (ret < 1) {
            xxlJobRegistryDao.registrySave(registryParam.getRegistGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue());

            // fresh
            freshGroupRegistryInfo(registryParam);
        }
        return ReturnT.SUCCESS;
    }

其中Executor動(dòng)態(tài)代理AdminBiz接口和Admin的/api動(dòng)態(tài)反射執(zhí)行具體方法屬于作者自研RPC框架部分,本篇只做了注冊(cè)部分和解析部分的介紹军俊,后續(xù)會(huì)單獨(dú)介紹自研RPC框架部分侥加。
到此,執(zhí)行器的地址就已經(jīng)完全注冊(cè)到管理器中粪躬。

閱讀原文關(guān)注公眾號(hào),更多文章持續(xù)更新中昔穴,原文地址:
https://mp.weixin.qq.com/s/sJarz6_zBWzKtIr-qFIbsw

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末镰官,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子吗货,更是在濱河造成了極大的恐慌泳唠,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,265評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件宙搬,死亡現(xiàn)場(chǎng)離奇詭異笨腥,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)勇垛,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,078評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門脖母,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人闲孤,你說我怎么就攤上這事谆级。” “怎么了讼积?”我有些...
    開封第一講書人閱讀 156,852評(píng)論 0 347
  • 文/不壞的土叔 我叫張陵肥照,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我勤众,道長(zhǎng)舆绎,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,408評(píng)論 1 283
  • 正文 為了忘掉前任们颜,我火速辦了婚禮吕朵,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘掌桩。我一直安慰自己边锁,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,445評(píng)論 5 384
  • 文/花漫 我一把揭開白布波岛。 她就那樣靜靜地躺著茅坛,像睡著了一般。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上贡蓖,一...
    開封第一講書人閱讀 49,772評(píng)論 1 290
  • 那天曹鸠,我揣著相機(jī)與錄音,去河邊找鬼斥铺。 笑死彻桃,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的晾蜘。 我是一名探鬼主播邻眷,決...
    沈念sama閱讀 38,921評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼剔交!你這毒婦竟也來了肆饶?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,688評(píng)論 0 266
  • 序言:老撾萬榮一對(duì)情侶失蹤岖常,失蹤者是張志新(化名)和其女友劉穎驯镊,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體竭鞍,經(jīng)...
    沈念sama閱讀 44,130評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡板惑,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,467評(píng)論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了偎快。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片冯乘。...
    茶點(diǎn)故事閱讀 38,617評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖滨砍,靈堂內(nèi)的尸體忽然破棺而出往湿,到底是詐尸還是另有隱情,我是刑警寧澤惋戏,帶...
    沈念sama閱讀 34,276評(píng)論 4 329
  • 正文 年R本政府宣布领追,位于F島的核電站,受9級(jí)特大地震影響响逢,放射性物質(zhì)發(fā)生泄漏绒窑。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,882評(píng)論 3 312
  • 文/蒙蒙 一舔亭、第九天 我趴在偏房一處隱蔽的房頂上張望些膨。 院中可真熱鬧,春花似錦钦铺、人聲如沸订雾。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,740評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽洼哎。三九已至,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間噩峦,已是汗流浹背锭沟。 一陣腳步聲響...
    開封第一講書人閱讀 31,967評(píng)論 1 265
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留识补,地道東北人族淮。 一個(gè)月前我還...
    沈念sama閱讀 46,315評(píng)論 2 360
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像凭涂,于是被迫代替她去往敵國(guó)和親祝辣。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,486評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容