【ElasticJob源碼解析】任務(wù)調(diào)度器

作業(yè)需要執(zhí)行攘蔽,必然需要一個調(diào)度器龙屉,去統(tǒng)籌作業(yè)執(zhí)行的邏輯,這也是ElasticJob的核心內(nèi)容秩彤;ElasticJob依賴注冊中心實現(xiàn)分片叔扼,所以調(diào)度器主要需要做的的事就是在任務(wù)啟動的時候,將任務(wù)的信息寫到注冊中心漫雷,其次就是啟動任務(wù)瓜富,具體的執(zhí)行邏輯需要慢慢分析;

1降盹,作業(yè)調(diào)度器-JobScheduler

首先來看看JobScheduler的構(gòu)造器与柑;

1.1,構(gòu)造器

private JobScheduler(final CoordinatorRegistryCenter regCenter, 
                    final LiteJobConfiguration liteJobConfig, 
                    final JobEventBus jobEventBus, 
                    final ElasticJobListener... elasticJobListeners) {
    JobRegistry.getInstance().addJobInstance(liteJobConfig.getJobName(), new JobInstance());
    this.liteJobConfig = liteJobConfig;
    this.regCenter = regCenter;
    List<ElasticJobListener> elasticJobListenerList = Arrays.asList(elasticJobListeners);
    setGuaranteeServiceForElasticJobListeners(regCenter, elasticJobListenerList);
    schedulerFacade = new SchedulerFacade(regCenter, liteJobConfig.getJobName(), elasticJobListenerList);
    jobFacade = new LiteJobFacade(regCenter, liteJobConfig.getJobName(), Arrays.asList(elasticJobListeners), jobEventBus);
}
  • 首先使用作業(yè)注冊表JobRegistry注冊當(dāng)前的作業(yè)信息蓄坏,標(biāo)注唯一作業(yè)的方式是价捧,使用ip地址+虛擬機(jī)進(jìn)程+作業(yè)名稱,來唯一標(biāo)識某臺機(jī)器上運行的某個項目的某個作業(yè)實例涡戳;
  • 其實是為監(jiān)聽器設(shè)置值结蟋,這個以后再講;
  • 最后就是兩個門面類的初始化賦值渔彰;

1.2嵌屎,啟動調(diào)度器-init

啟動調(diào)度器使用的是init方法,這個方法可以理解為啟動一個定時任務(wù)恍涂,只不過這個定時任務(wù)功能比較強(qiáng)大宝惰,不僅僅是簡單的定時執(zhí)行;

首先來看init方法:

public void init() {
    LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);
    JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());
    JobScheduleController jobScheduleController = new JobScheduleController(
            createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());
    JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);
    schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());
    jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
}
  • 第一行首先更新該作業(yè)的配置信息到注冊中心再沧,更新的條件是注冊中心中尼夺,名為jobName+"config"的節(jié)點不存在,或者LiteJobConfiguration的overWrite字段設(shè)置為true時,會強(qiáng)制更新淤堵;
  • 第二行代碼是將本次作業(yè)的分片總數(shù)注冊到注冊表JobRegistry中寝衫;
  • 然后構(gòu)建一個作業(yè)調(diào)度器的控制器JobScheduleController,這個控制器粘勒,可以實現(xiàn)調(diào)度竞端,暫停,恢復(fù)庙睡,關(guān)閉等操作,使用的是quartz的Scheduler來實現(xiàn)的技俐,所以構(gòu)造是需要一個Scheduler對象乘陪,同時還需要一個JobDetail對象,這兩個對象后面講雕擂;
  • 控制器構(gòu)造完成后啡邑,將控制器和注冊中心注冊到作業(yè)注冊表(JobRegistry)中,也就是說這個注冊表中包含了所有作業(yè)的所有信息井赌,當(dāng)后期需要時谤逼,可以從注冊表中獲取仇穗;
  • 再然后使用門面裝飾類schedulerFacade流部,注冊初始化信息到注冊中心,這里使用了裝飾器模式纹坐,將大量的邏輯分在在里面枝冀,需要另開一篇講解,總之他的任務(wù)就是講作業(yè)的各項信息寫到注冊中心耘子;
  • 最后果漾,使用作業(yè)調(diào)度器的控制器,啟動調(diào)度操作谷誓,也就是啟動定時任務(wù)绒障,使用quartz來實現(xiàn)的;

再來看看作業(yè)調(diào)度器的控制器需要的JobDetail是如何構(gòu)造的:

private JobDetail createJobDetail(final String jobClass) {
    JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build();
    result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);
    Optional<ElasticJob> elasticJobInstance = createElasticJobInstance();
    if (elasticJobInstance.isPresent()) {
        result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJobInstance.get());
    } else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) {
        try {
            result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobClass).newInstance());
        } catch (final ReflectiveOperationException ex) {
            throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", jobClass);
        }
    }
    return result;
}
  • 構(gòu)造方式很簡單捍歪,使用一個builder對象户辱,然后將elasticJob對象和jobFacade對象放置進(jìn)去;
  • 但是這里面還有一點很有意思的東西费封,newJob的對象LiteJob焕妙,他是quartz的job接口的實現(xiàn)類,他內(nèi)部擁有兩個屬性elasticJob和jobFacade弓摘,而后面的代碼向jobDataModel中(ELASTIC_JOB_DATA_MAP_KEY->elasticJob焚鹊;JOB_FACADE_DATA_MAP_KEY->jobFacade)放置的也是這兩個屬性,quartz框架會將jobDataModel中的鍵值對賦值給newJob對象中對應(yīng)的屬性;
  • 還需要注意的是末患,當(dāng)jobClass為ScriptJob時研叫,elasticJob是沒有放置到j(luò)obDataModel中的,但是沒關(guān)系璧针,在LiteJob中調(diào)用的JobExecutorFactory在調(diào)用時嚷炉,如果elasticJob為null,那么就默認(rèn)執(zhí)行ScriptJobExecutor探橱;

再來看看Scheduler對象是如何構(gòu)建:

 private Scheduler createScheduler() {
    Scheduler result;
    try {
        StdSchedulerFactory factory = new StdSchedulerFactory();
        factory.initialize(getBaseQuartzProperties());
        result = factory.getScheduler();
        result.getListenerManager().addTriggerListener(schedulerFacade.newJobTriggerListener());
    } catch (final SchedulerException ex) {
        throw new JobSystemException(ex);
    }
    return result;
}

private Properties getBaseQuartzProperties() {
    Properties result = new Properties();
    result.put("org.quartz.threadPool.class", org.quartz.simpl.SimpleThreadPool.class.getName());
    result.put("org.quartz.threadPool.threadCount", "1");
    result.put("org.quartz.scheduler.instanceName", liteJobConfig.getJobName());
    result.put("org.quartz.jobStore.misfireThreshold", "1");
    result.put("org.quartz.plugin.shutdownhook.class", JobShutdownHookPlugin.class.getName());
    result.put("org.quartz.plugin.shutdownhook.cleanShutdown", Boolean.TRUE.toString());
    return result;
}
  • 這里注冊了一個JobTriggerListener申屹,用來設(shè)置任務(wù)被錯過執(zhí)行的標(biāo)記;
  • JobShutdownHookPlugin是用來在Scheduler關(guān)閉的時候做掃尾工作的隧膏;

總結(jié):

  • 調(diào)度器的啟動邏輯較為復(fù)雜哗讥,大量邏輯包含在內(nèi)部尚未解析,但是他主要做的事:
    • 向注冊中心寫入各種節(jié)點信息胞枕;
    • 向作業(yè)注冊表(JobRegistry)中杆煞,寫入作業(yè)的各種信息;
    • 構(gòu)建并啟動quartz的調(diào)度器腐泻,也就是啟動定時任務(wù)决乎,執(zhí)行本次作業(yè);

2派桩,作業(yè)的具體執(zhí)行

從上面代碼也可以看出构诚,每次定時任務(wù)出發(fā)的時候,quratz會調(diào)用窄坦,實現(xiàn)了Job接口的LiteJob類的execute方法唤反,那么我們就從這兒開始看起;

public final class LiteJob implements Job {
    
    @Setter
    private ElasticJob elasticJob;
    
    @Setter
    private JobFacade jobFacade;
    
    @Override
    public void execute(final JobExecutionContext context) throws JobExecutionException {
        JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
    }
}

  • 上面也分析到了鸭津,這個類的構(gòu)造器只有空構(gòu)造函數(shù)彤侍,有quartz框架new出來,并且會設(shè)置elasticJob和jobFacade的值逆趋,值的來源是盏阶,構(gòu)建JobDetail對象時,設(shè)置到j(luò)obDataMap中的同名屬性闻书;
  • 然后使用JobExecutorFactory根據(jù)elasticJob的類型的不同挑選對應(yīng)的作業(yè)執(zhí)行器名斟,代碼如下;
public static AbstractElasticJobExecutor getJobExecutor(final ElasticJob elasticJob, final JobFacade jobFacade) {
        if (null == elasticJob) {
            return new ScriptJobExecutor(jobFacade);
        }
        if (elasticJob instanceof SimpleJob) {
            return new SimpleJobExecutor((SimpleJob) elasticJob, jobFacade);
        }
        if (elasticJob instanceof DataflowJob) {
            return new DataflowJobExecutor((DataflowJob) elasticJob, jobFacade);
        }
        throw new JobConfigurationException("Cannot support job type '%s'", elasticJob.getClass().getCanonicalName());
    }
  • 在構(gòu)建JobDetail時說過魄眉,如果作業(yè)類型為ScriptJob砰盐,那么elasticJob是沒有值的,在此處坑律,如果沒有值岩梳,就返回ScriptJobExecutor,與之前相對應(yīng);
  • 其他的冀值,根據(jù)作業(yè)類型也物,獲取相應(yīng)的執(zhí)行器,如果沒有列疗,直接報錯滑蚯;

2.1,簡單作業(yè)執(zhí)行器-SimpleJobExecutor

public final class SimpleJobExecutor extends AbstractElasticJobExecutor {
    
    private final SimpleJob simpleJob;
    
    public SimpleJobExecutor(final SimpleJob simpleJob, final JobFacade jobFacade) {
        super(jobFacade);
        this.simpleJob = simpleJob;
    }
    
    @Override
    protected void process(final ShardingContext shardingContext) {
        simpleJob.execute(shardingContext);
    }
}

這個類沒什么講的抵栈,只是向父類傳遞了一個參數(shù)告材,然后重寫了一個方法,但是這個方法調(diào)用的是我們實現(xiàn)SimpleJob時復(fù)寫的方法古劲,也就是我們自己寫的業(yè)務(wù)邏輯在此處執(zhí)行创葡;主要還是看看他的父類中干了什么,先來看看父類的構(gòu)造器绢慢;

2.1.1,執(zhí)行器的構(gòu)造器

protected AbstractElasticJobExecutor(final JobFacade jobFacade) {
    this.jobFacade = jobFacade;
    jobRootConfig = jobFacade.loadJobRootConfiguration(true);
    jobName = jobRootConfig.getTypeConfig().getCoreConfig().getJobName();
    executorService = ExecutorServiceHandlerRegistry.getExecutorServiceHandler(jobName, (ExecutorServiceHandler) getHandler(JobProperties.JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER));
    jobExceptionHandler = (JobExceptionHandler) getHandler(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER);
    itemErrorMessages = new ConcurrentHashMap<>(jobRootConfig.getTypeConfig().getCoreConfig().getShardingTotalCount(), 1);
}
  • 根據(jù)子類傳過來的JobFacade對象洛波,獲取JobRootConfig胰舆,JobName;
  • 以及執(zhí)行作業(yè)的線程池ExecutorService蹬挤,和作業(yè)的異常處理器jobExceptionHandler缚窿,這兩個不同類型的字段是通過同一方法(getHandler)獲取的,值得借鑒焰扳;
private Object getHandler(final JobProperties.JobPropertiesEnum jobPropertiesEnum) {
    String handlerClassName = jobRootConfig.getTypeConfig().getCoreConfig().getJobProperties().get(jobPropertiesEnum);
    try {
        Class<?> handlerClass = Class.forName(handlerClassName);
        if (jobPropertiesEnum.getClassType().isAssignableFrom(handlerClass)) {
            return handlerClass.newInstance();
        }
        return getDefaultHandler(jobPropertiesEnum, handlerClassName);
    } catch (final ReflectiveOperationException ex) {
        return getDefaultHandler(jobPropertiesEnum, handlerClassName);
    }
}

實現(xiàn)邏輯是這樣的:

  • 方法參數(shù)給定一個 JobProperties.JobPropertiesEnum 類型的參數(shù)作為默認(rèn)值倦零;
  • 如果能從核心配置類(JobCoreConfiguration)中獲取,定義作業(yè)配置的值吨悍,那么就使用配置的值扫茅,如果沒有,或者配置的不是方法參數(shù)中期望的類型育瓜,那么就使用參數(shù)中的默認(rèn)值葫隙;
  • 獲取默認(rèn)值的方法如下;
private Object getDefaultHandler(final JobProperties.JobPropertiesEnum jobPropertiesEnum, final String handlerClassName) {
    log.warn("Cannot instantiation class '{}', use default '{}' class.", handlerClassName, jobPropertiesEnum.getKey());
    try {
        return Class.forName(jobPropertiesEnum.getDefaultValue()).newInstance();
    } catch (final ClassNotFoundException | InstantiationException | IllegalAccessException e) {
        throw new JobSystemException(e);
    }
}

2.1.2躏仇,執(zhí)行器的execute方法

該方法邏輯比較長恋脚,Elasticjob也分成了幾段來實現(xiàn),我們就一段一段的看焰手;

  1. 第一段
public final void execute() {
    try {
        jobFacade.checkJobExecutionEnvironment();
    } catch (final JobExecutionEnvironmentException cause) {
        jobExceptionHandler.handleException(jobName, cause);
    }
    ShardingContexts shardingContexts = jobFacade.getShardingContexts();
    if (shardingContexts.isAllowSendJobEvent()) {
        jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobName));
    }
    if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) {
        if (shardingContexts.isAllowSendJobEvent()) {
            jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format(
                    "Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobName, 
                    shardingContexts.getShardingItemParameters().keySet()));
        }
        return;
    }
    try {
        jobFacade.beforeJobExecuted(shardingContexts);
        //CHECKSTYLE:OFF
    } catch (final Throwable cause) {
        //CHECKSTYLE:ON
        jobExceptionHandler.handleException(jobName, cause);
    }
    execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER);
    while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
        jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
        execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE);
    }
    jobFacade.failoverIfNecessary();
    try {
        jobFacade.afterJobExecuted(shardingContexts);
        //CHECKSTYLE:OFF
    } catch (final Throwable cause) {
        //CHECKSTYLE:ON
        jobExceptionHandler.handleException(jobName, cause);
    }
}
  • 首先檢查本機(jī)與注冊中心的時間誤差秒數(shù)是否在允許范圍糟描;
    • 如果maxTimeDiffSeconds配置為-1,表示不檢查书妻;
    • 需要注意一點船响,即使誤差過大,拋出異常,如果使用的是默認(rèn)的異常處理器灿意,那么也只是會打印error日志估灿,而并不會阻礙程序;
  • 然后獲取分片上下文缤剧;
    • 這里的獲取到的分片邏輯有點復(fù)雜馅袁,在2.2中在來具體解釋一下,因為涉及到了失效轉(zhuǎn)移的問題荒辕;
    • 如果沒有失效轉(zhuǎn)移汗销,那么獲取的分片就是當(dāng)前實例在執(zhí)行分片時獲取的分片,去除標(biāo)記了disable的分片抵窒;
  • 然后判斷弛针,如果當(dāng)前分片項仍在運行,是否需要設(shè)置任務(wù)被錯過執(zhí)行的標(biāo)記李皇;
    • 如果需要設(shè)置削茁,那么當(dāng)前任務(wù)將被跳過,并設(shè)置任務(wù)被錯過執(zhí)行的標(biāo)記掉房;
    • 如果不需要設(shè)置茧跋,那么接著執(zhí)行后面的邏輯;
    • 需要注意的是分片項是否正在運行的判斷邏輯卓囚;
      • 首先根據(jù)LiteJobConfiguration中的monitorExecution字段判斷是否監(jiān)控執(zhí)行瘾杭,默認(rèn)為true,如果為false哪亿,根本就不會設(shè)置 錯過執(zhí)行粥烁,即使上一次的定時任務(wù)還在執(zhí)行,這一次的定時任務(wù)也將啟動執(zhí)行蝇棉;
      • 其次讨阻,本項目實例拿到的分片中,有任意一個分片還在運行中,那么所持有的所有分片,都將錯過本次定時任務(wù)的執(zhí)行卓缰;
  • 然后執(zhí)行監(jiān)聽器(ElasticJobListener)的beforeJobExecuted方法米绕;
  • 然后執(zhí)行第二段邏輯,下面再講;
  • 然后判斷作業(yè)是否需要執(zhí)行錯過的任務(wù),如果需要,那么還是執(zhí)行第二段邏輯链患;
  • 被錯過執(zhí)行有兩種情況:
    • 開啟了monitorExecution,發(fā)現(xiàn)上一次的任務(wù)還在執(zhí)行中瓶您,那么本實例拿到的所有分片的這一次的定時任務(wù)都將被錯過麻捻;
    • 如果定時任務(wù)時間間距過小纲仍,如10秒,而任務(wù)執(zhí)行了12秒贸毕,那么quartz會觸發(fā)監(jiān)聽器JobTriggerListener郑叠,監(jiān)聽器會設(shè)置本作業(yè)實例的所有分片為錯過執(zhí)行;
  • 然后判斷如果需要失效轉(zhuǎn)移, 則執(zhí)行作業(yè)失效轉(zhuǎn)移明棍;
  • 最后執(zhí)行監(jiān)聽器的afterJobExecuted方法乡革;
  1. 第二段
private void execute(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
    if (shardingContexts.getShardingItemParameters().isEmpty()) {
        if (shardingContexts.isAllowSendJobEvent()) {
            jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobName));
        }
        return;
    }
    jobFacade.registerJobBegin(shardingContexts);
    String taskId = shardingContexts.getTaskId();
    if (shardingContexts.isAllowSendJobEvent()) {
        jobFacade.postJobStatusTraceEvent(taskId, State.TASK_RUNNING, "");
    }
    try {
        process(shardingContexts, executionSource);
    } finally {
        // TODO 考慮增加作業(yè)失敗的狀態(tài),并且考慮如何處理作業(yè)失敗的整體回路
        jobFacade.registerJobCompleted(shardingContexts);
        if (itemErrorMessages.isEmpty()) {
            if (shardingContexts.isAllowSendJobEvent()) {
                jobFacade.postJobStatusTraceEvent(taskId, State.TASK_FINISHED, "");
            }
        } else {
            if (shardingContexts.isAllowSendJobEvent()) {
                jobFacade.postJobStatusTraceEvent(taskId, State.TASK_ERROR, itemErrorMessages.toString());
            }
        }
    }
}
  • 該段代碼如果monitorExecution屬性沒有開啟摊腋,將不會有什么意義沸版;
    • 首先需要在注冊中心中增加running節(jié)點,表示任務(wù)正在運行兴蒸;
    • 當(dāng)處理完后视粮,在finally中再將running移除;
  1. 第三段
private void process(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
    Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet();
    if (1 == items.size()) {
        int item = shardingContexts.getShardingItemParameters().keySet().iterator().next();
        JobExecutionEvent jobExecutionEvent =  new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, item);
        process(shardingContexts, item, jobExecutionEvent);
        return;
    }
    final CountDownLatch latch = new CountDownLatch(items.size());
    for (final int each : items) {
        final JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, each);
        if (executorService.isShutdown()) {
            return;
        }
        executorService.submit(new Runnable() {
            
            @Override
            public void run() {
                try {
                    process(shardingContexts, each, jobExecutionEvent);
                } finally {
                    latch.countDown();
                }
            }
        });
    }
    try {
        latch.await();
    } catch (final InterruptedException ex) {
        Thread.currentThread().interrupt();
    }
}
  • 這里對ShardingContexts中是否只包含了一個分片做了區(qū)分處理
    • 如果只包含一個分片橙凳,比較好處理蕾殴,直接進(jìn)入第三段的處理即可;
    • 如果包含多個分片岛啸,那么因為第二段代碼是夾在第一段代碼的監(jiān)聽器的兩個方法中間執(zhí)行的区宇,必須要處理完所有的分片任務(wù),才能執(zhí)行監(jiān)聽器的afterJobExecuted方法值戳;
    • 這里使用了CountDownLatch對象,該對象在初始化的時候需要設(shè)置一個數(shù)量值炉爆,每調(diào)用一次countDown方法堕虹,則數(shù)量會減少1,如果值不為0芬首,那么線程會一直阻塞在await方法處赴捞,此處是為了實現(xiàn)線程等待而使用的,等待所有分片任務(wù)執(zhí)行完郁稍,再接著向下執(zhí)行赦政;
  1. 第四段
private void process(final ShardingContexts shardingContexts, final int item, final JobExecutionEvent startEvent) {
    if (shardingContexts.isAllowSendJobEvent()) {
        jobFacade.postJobExecutionEvent(startEvent);
    }
    log.trace("Job '{}' executing, item is: '{}'.", jobName, item);
    JobExecutionEvent completeEvent;
    try {
        process(new ShardingContext(shardingContexts, item));
        completeEvent = startEvent.executionSuccess();
        log.trace("Job '{}' executed, item is: '{}'.", jobName, item);
        if (shardingContexts.isAllowSendJobEvent()) {
            jobFacade.postJobExecutionEvent(completeEvent);
        }
        // CHECKSTYLE:OFF
    } catch (final Throwable cause) {
        // CHECKSTYLE:ON
        completeEvent = startEvent.executionFailure(cause);
        jobFacade.postJobExecutionEvent(completeEvent);
        itemErrorMessages.put(item, ExceptionUtil.transform(cause));
        jobExceptionHandler.handleException(jobName, cause);
    }
}

protected abstract void process(ShardingContext shardingContext);
  • 這一段沒什么邏輯,除了調(diào)用真實的業(yè)務(wù)邏輯外耀怜,就是發(fā)布一些事件恢着;
  • 真正執(zhí)行的業(yè)務(wù)代碼,由子類提供财破;

2.2掰派,失效轉(zhuǎn)移

首先來解釋一下失效轉(zhuǎn)移的含義:

  • 就是在某個定時任務(wù),從這一次開始運行左痢,到下一次開始運行之前靡羡,這之間的時間段內(nèi)系洛,某一個或多個作業(yè)實例出現(xiàn)了問題(或者作業(yè)實例的部分分片出了問題),那么這些出問題的作業(yè)實例略步,這一次的定時任務(wù)就相當(dāng)于沒有執(zhí)行描扯,那么失效轉(zhuǎn)移功能就會將這些沒有執(zhí)行的分片,轉(zhuǎn)移到其他正常機(jī)器趟薄,馬上觸發(fā)執(zhí)行绽诚;
  • 當(dāng)下一次任務(wù)的時間點到來時,開始重新分片竟趾,然后繼續(xù)正常執(zhí)行憔购;

再來看看代碼層面的實現(xiàn):

  • 首先是JobCrashedJobListener監(jiān)聽到了需要失效轉(zhuǎn)移的分片項;
  • 然后逐一設(shè)置失效的分片項標(biāo)記岔帽,具體位置在注冊中心的:{jobName}/leader/failover/items/{item};
  • 然后開始逐一執(zhí)行作業(yè)失效轉(zhuǎn)移玫鸟,先執(zhí)行主節(jié)點選舉爭搶當(dāng)前分片,然后執(zhí)行回調(diào)犀勒;
public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {
    try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {
        latch.start();
        latch.await();
        callback.execute();
    //CHECKSTYLE:OFF
    } catch (final Exception ex) {
    //CHECKSTYLE:ON
        handleException(ex);
    }
}
  • 回調(diào)使用的是FailoverLeaderExecutionCallback:
    • 首先向{jobName}/sharding/{item}/failover中填充臨時數(shù)據(jù)屎飘;
    • 然后移除先前的{jobName}/leader/failover/items/{item}中的數(shù)據(jù);
    • 最后構(gòu)建一個jobScheduleController對象贾费,調(diào)用triggerJob方法钦购,馬上進(jìn)行一次任務(wù)啟動操作;

再來看看AbstractElasticJobExecutor中獲取分片的邏輯:

public ShardingContexts getShardingContexts() {
    boolean isFailover = configService.load(true).isFailover();
    if (isFailover) {
        List<Integer> failoverShardingItems = failoverService.getLocalFailoverItems();
        if (!failoverShardingItems.isEmpty()) {
            return executionContextService.getJobShardingContext(failoverShardingItems);
        }
    }
    shardingService.shardingIfNecessary();
    List<Integer> shardingItems = shardingService.getLocalShardingItems();
    if (isFailover) {
        shardingItems.removeAll(failoverService.getLocalTakeOffItems());
    }
    shardingItems.removeAll(executionService.getDisabledItems(shardingItems));
    return executionContextService.getJobShardingContext(shardingItems);
}
  • 如果開啟了失效轉(zhuǎn)移褂萧,那么就去獲取{jobName}/sharding下所有item中的failover節(jié)點押桃,并篩選出當(dāng)前作業(yè)實例的failover,將這些failover對應(yīng)的item構(gòu)造成ShardingContexts直接返回导犹;這里的執(zhí)行邏輯唱凯,對應(yīng)的是上面jobScheduleController對象調(diào)用triggerJob方法;
  • 失效轉(zhuǎn)移執(zhí)行完成后谎痢,下一次再來獲取分片的時候磕昼,會執(zhí)行shardingIfNecessary方法,重新進(jìn)行分片节猿;
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末票从,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子滨嘱,更是在濱河造成了極大的恐慌峰鄙,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,470評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件太雨,死亡現(xiàn)場離奇詭異先馆,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)躺彬,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,393評論 3 392
  • 文/潘曉璐 我一進(jìn)店門煤墙,熙熙樓的掌柜王于貴愁眉苦臉地迎上來梅惯,“玉大人,你說我怎么就攤上這事仿野∠臣酰” “怎么了?”我有些...
    開封第一講書人閱讀 162,577評論 0 353
  • 文/不壞的土叔 我叫張陵脚作,是天一觀的道長葫哗。 經(jīng)常有香客問我,道長球涛,這世上最難降的妖魔是什么劣针? 我笑而不...
    開封第一講書人閱讀 58,176評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮亿扁,結(jié)果婚禮上捺典,老公的妹妹穿的比我還像新娘。我一直安慰自己从祝,他們只是感情好襟己,可當(dāng)我...
    茶點故事閱讀 67,189評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著牍陌,像睡著了一般擎浴。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上毒涧,一...
    開封第一講書人閱讀 51,155評論 1 299
  • 那天贮预,我揣著相機(jī)與錄音,去河邊找鬼契讲。 笑死萌狂,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的怀泊。 我是一名探鬼主播,決...
    沈念sama閱讀 40,041評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼误趴,長吁一口氣:“原來是場噩夢啊……” “哼霹琼!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起凉当,我...
    開封第一講書人閱讀 38,903評論 0 274
  • 序言:老撾萬榮一對情侶失蹤枣申,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后看杭,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體忠藤,經(jīng)...
    沈念sama閱讀 45,319評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,539評論 2 332
  • 正文 我和宋清朗相戀三年楼雹,在試婚紗的時候發(fā)現(xiàn)自己被綠了模孩。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片尖阔。...
    茶點故事閱讀 39,703評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖榨咐,靈堂內(nèi)的尸體忽然破棺而出介却,到底是詐尸還是另有隱情,我是刑警寧澤块茁,帶...
    沈念sama閱讀 35,417評論 5 343
  • 正文 年R本政府宣布齿坷,位于F島的核電站,受9級特大地震影響数焊,放射性物質(zhì)發(fā)生泄漏永淌。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,013評論 3 325
  • 文/蒙蒙 一佩耳、第九天 我趴在偏房一處隱蔽的房頂上張望遂蛀。 院中可真熱鬧,春花似錦蚕愤、人聲如沸答恶。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,664評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽悬嗓。三九已至,卻和暖如春裕坊,著一層夾襖步出監(jiān)牢的瞬間包竹,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,818評論 1 269
  • 我被黑心中介騙來泰國打工籍凝, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留周瞎,地道東北人。 一個月前我還...
    沈念sama閱讀 47,711評論 2 368
  • 正文 我出身青樓饵蒂,卻偏偏與公主長得像声诸,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子退盯,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,601評論 2 353

推薦閱讀更多精彩內(nèi)容