XXL-JOB 概述

XXL-JOB是一個分布式任務(wù)調(diào)度平臺休玩,其核心設(shè)計目標(biāo)是開發(fā)迅速躏结、學(xué)習(xí)簡單、輕量級彰居、易擴展〕现剑現(xiàn)已開放源代碼并接入多家公司線上產(chǎn)品線,開箱即用

文檔及源碼

中文文檔

源碼倉庫地址

源碼倉庫地址 Release Download
https://github.com/xuxueli/xxl-job Download
http://gitee.com/xuxueli0323/xxl-job Download

一陈惰、調(diào)度中心部署

1.1 初始化“調(diào)度數(shù)據(jù)庫”

數(shù)據(jù)庫中根據(jù)“調(diào)度數(shù)據(jù)庫初始化SQL腳本”創(chuàng)建調(diào)度數(shù)據(jù)庫

“調(diào)度數(shù)據(jù)庫初始化SQL腳本” 位置為:

調(diào)度中心支持集群部署畦徘,集群情況下各節(jié)點務(wù)必連接同一個mysql實例

如果mysql做主從,調(diào)度中心集群節(jié)點務(wù)必強制走主庫

1.2 部署 xxl-job-admin(調(diào)度中心)

統(tǒng)一管理任務(wù)調(diào)度平臺上調(diào)度任務(wù),負責(zé)觸發(fā)調(diào)度執(zhí)行抬闯,并且提供任務(wù)管理平臺

1.2.1 調(diào)度中心配置

### 調(diào)度中心JDBC鏈接
spring.datasource.url=jdbc:mysql://database_ip:port/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
spring.datasource.username=your_username
spring.datasource.password=your_password
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
### 報警郵箱
spring.mail.host=smtp.qq.com
spring.mail.port=25
spring.mail.username=xxx@qq.com
spring.mail.password=xxx
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true
spring.mail.properties.mail.smtp.socketFactory.class=javax.net.ssl.SSLSocketFactory
### 調(diào)度中心通訊TOKEN:非空時啟用井辆;
xxl.job.accessToken=
### 調(diào)度中心國際化配置:"zh_CN"/中文簡體, "zh_TC"/中文繁體 and "en"/英文;
xxl.job.i18n=zh_CN
## 調(diào)度線程池最大線程配置
xxl.job.triggerpool.fast.max=200
xxl.job.triggerpool.slow.max=100
### 調(diào)度中心日志表數(shù)據(jù)保存天數(shù):過期日志自動清理溶握;限制大于等于7時生效杯缺,否則, 如-1,關(guān)閉自動清理功能睡榆;
xxl.job.logretentiondays=30

1.2.2 調(diào)度中心部署

如果已經(jīng)正確進行上述配置萍肆,可將項目編譯打包部署,本例使用的測試服務(wù)器 ip 為:10.91.198.13胀屿,端口為 18080

調(diào)度中心訪問地址:http://10.91.198.13:18080/xxl-job-admin (該地址執(zhí)行器將會使用到塘揣,作為回調(diào)地址)

默認登錄賬號 “admin/123456”, 登錄后運行界面如下圖所示

1.3 調(diào)度中心集群

調(diào)度中心集群部署時,有以下注意點:

  • DB配置保持一致
  • 集群機器時鐘保持一致(單機集群忽視)
  • 建議:推薦通過nginx為調(diào)度中心集群做負載均衡宿崭,分配域名亲铡。調(diào)度中心訪問、執(zhí)行器回調(diào)配置、調(diào)用API服務(wù)等操作均通過該域名進行

二奖蔓、現(xiàn)有項目接入

2.1 引入POM

<!-- xxl-job-core -->
<dependency>
    <groupId>com.saicmotor.com</groupId>
    <artifactId>sp-xxl-job-core</artifactId>
    <version>1.0.0</version>
</dependency>

2.2 添加 xxl-job 配置項

# 項目端口
server.port=8081
# no web
#spring.main.web-environment=false

# 日志配置
logging.config=classpath:logback.xml

### 調(diào)度中心部署跟地址 [選填]:如調(diào)度中心集群部署存在多個地址則用逗號分隔赞草。執(zhí)行器將會使用該地址進行"執(zhí)行器心跳注冊"和"任務(wù)結(jié)果回調(diào)";為空則關(guān)閉自動注冊
xxl.job.admin.addresses=http://10.91.198.13:18080/xxl-job-admin

### 執(zhí)行器通訊TOKEN [選填]:非空時啟用
xxl.job.accessToken=

### 執(zhí)行器AppName [選填]:執(zhí)行器心跳注冊分組依據(jù)吆鹤;為空則關(guān)閉自動注冊
xxl.job.executor.appname=executor-test-lzn
### 執(zhí)行器注冊 [選填]:優(yōu)先使用該配置作為注冊地址厨疙,為空時使用內(nèi)嵌服務(wù) ”IP:PORT“ 作為注冊地址。從而更靈活的支持容器類型執(zhí)行器動態(tài)IP和動態(tài)映射端口問題
xxl.job.executor.address=http://項目ip:執(zhí)行器端口號
### 執(zhí)行器IP [選填]:默認為空表示自動獲取IP檀头,多網(wǎng)卡時可手動設(shè)置指定IP轰异,該IP不會綁定Host僅作為通訊實用;地址信息用于 "執(zhí)行器注冊" 和 "調(diào)度中心請求并觸發(fā)任務(wù)"
xxl.job.executor.ip=項目ip
### 執(zhí)行器端口號 [選填]:小于等于0則自動獲仁钍肌;默認端口為9999婴削,單機部署多個執(zhí)行器時廊镜,注意要配置不同執(zhí)行器端口
xxl.job.executor.port=9999
### 執(zhí)行器運行日志文件存儲磁盤路徑 [選填] :需要對該路徑擁有讀寫權(quán)限;為空則使用默認路徑
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
### 執(zhí)行器日志文件保存天數(shù) [選填] : 過期日志自動清理, 限制值大于等于3時生效; 否則, 如-1, 關(guān)閉自動清理功能
xxl.job.executor.logretentiondays=30

2.3 添加執(zhí)行器組件配置(現(xiàn)已自動裝配)

@Configuration
public class XxlJobConfig {
    private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);

    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;

    @Value("${xxl.job.accessToken}")
    private String accessToken;

    @Value("${xxl.job.executor.appname}")
    private String appname;

    @Value("${xxl.job.executor.address}")
    private String address;

    @Value("${xxl.job.executor.ip}")
    private String ip;

    @Value("${xxl.job.executor.port}")
    private int port;

    @Value("${xxl.job.executor.logpath}")
    private String logPath;

    @Value("${xxl.job.executor.logretentiondays}")
    private int logRetentionDays;

    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        logger.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setAddress(address);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

        return xxlJobSpringExecutor;
    }
}

2.4 自定義定時任務(wù)

開發(fā)步驟:

  • 在 Spring Bean 實例中唉俗,開發(fā) Job 方法嗤朴,方式格式要求為 "public ReturnT<String> execute(String param)"
  • 為 Job 方法添加注解 "@XxlJob(value="自定義 jobhandler 名稱", init = "JobHandler 初始化方法", destroy = "JobHandler 銷毀方法")",注解value值對應(yīng)的是調(diào)度中心新建任務(wù)的 JobHandler 屬性的值
/**
 * XxlJob開發(fā)示例
 */
@Component
public class SampleXxlJobPools {
    private static Logger logger = LoggerFactory.getLogger(SampleXxlJob.class);

    @XxlJob(value = "testJobHandler", init = "init", destroy = "destroy")
    public ReturnT<String> demoJobHandler(String param) throws Exception {
        XxlJobLogger.log("XXL-JOB, Hello World, Hello zll");
        return ReturnT.SUCCESS;
    }

    public void init() {
        logger.info("testJobHandler init");
    }

    public void destroy() {
        logger.info("testJobHandler destory");
    }
}

2.5 配置執(zhí)行器

進入調(diào)度中心:http://10.91.198.13:18080/xxl-job-admin

進入執(zhí)行器管理虫溜,點擊操作 -> 編輯雹姊,進入執(zhí)行器編輯頁面

2.6 配置及執(zhí)行任務(wù)

進入調(diào)度中心:http://10.91.198.13:18080/xxl-job-admin

進入任務(wù)管理,點擊新增衡楞,進入新增任務(wù)頁面

任務(wù)配置參數(shù)釋義如下:

參數(shù) 說明
執(zhí)行器 任務(wù)的綁定的執(zhí)行器吱雏,任務(wù)觸發(fā)調(diào)度時將會自動發(fā)現(xiàn)注冊成功的執(zhí)行器, 實現(xiàn)任務(wù)自動發(fā)現(xiàn)功能; 另一方面也可以方便的進行任務(wù)分組。每個任務(wù)必須綁定一個執(zhí)行器, 可在 "執(zhí)行器管理" 進行設(shè)置
任務(wù)描述 任務(wù)的描述信息瘾境,便于任務(wù)管理
路由策略 FIRST(第一個):固定選擇第一個機器歧杏;
LAST(最后一個):固定選擇最后一個機器;
ROUND(輪詢):輪詢迷守;
RANDOM(隨機):隨機選擇在線的機器犬绒;
CONSISTENT_HASH(一致性HASH):每個任務(wù)按照Hash算法固定選擇某一臺機器,且所有任務(wù)均勻散列在不同機器上兑凿;
LEAST_FREQUENTLY_USED(最不經(jīng)常使用):使用頻率最低的機器優(yōu)先被選舉凯力;
LEAST_RECENTLY_USED(最近最久未使用):最久未使用的機器優(yōu)先被選舉;
FAILOVER(故障轉(zhuǎn)移):按照順序依次進行心跳檢測礼华,第一個心跳檢測成功的機器選定為目標(biāo)執(zhí)行器并發(fā)起調(diào)度咐鹤;
BUSYOVER(忙碌轉(zhuǎn)移):按照順序依次進行空閑檢測,第一個空閑檢測成功的機器選定為目標(biāo)執(zhí)行器并發(fā)起調(diào)度卓嫂;
SHARDING_BROADCAST(分片廣播):廣播觸發(fā)對應(yīng)集群中所有機器執(zhí)行一次任務(wù)慷暂,同時系統(tǒng)自動傳遞分片參數(shù);可根據(jù)分片參數(shù)開發(fā)分片任務(wù);
Cron 觸發(fā)任務(wù)執(zhí)行的Cron表達式
運行模式 BEAN模式:任務(wù)以JobHandler方式維護在執(zhí)行器端行瑞;需要結(jié)合 "JobHandler" 屬性匹配執(zhí)行器中任務(wù)奸腺;
GLUE模式(Java):任務(wù)以源碼方式維護在調(diào)度中心;該模式的任務(wù)實際上是一段繼承自IJobHandler的Java類代碼并 "groovy" 源碼方式維護血久,它在執(zhí)行器項目中運行突照,可使用@Resource/@Autowire注入執(zhí)行器里中的其他服務(wù);
GLUE模式(Shell):任務(wù)以源碼方式維護在調(diào)度中心氧吐;該模式的任務(wù)實際上是一段 "shell" 腳本讹蘑;
GLUE模式(Python):任務(wù)以源碼方式維護在調(diào)度中心;該模式的任務(wù)實際上是一段 "python" 腳本筑舅;
GLUE模式(PHP):任務(wù)以源碼方式維護在調(diào)度中心座慰;該模式的任務(wù)實際上是一段 "php" 腳本;
GLUE模式(NodeJS):任務(wù)以源碼方式維護在調(diào)度中心翠拣;該模式的任務(wù)實際上是一段 "nodejs" 腳本版仔;
GLUE模式(PowerShell):任務(wù)以源碼方式維護在調(diào)度中心;該模式的任務(wù)實際上是一段 "PowerShell" 腳本误墓;
JobHandler 行模式為 "BEAN模式" 時生效蛮粮,對應(yīng)執(zhí)行器中新開發(fā)的 JobHandler 類 “@JobHandler” 注解自定義的 value 值
阻塞處理策略 調(diào)度過于密集執(zhí)行器來不及處理時的處理策略;
單機串行(默認):調(diào)度請求進入單機執(zhí)行器后谜慌,調(diào)度請求進入FIFO隊列并以串行方式運行然想;
丟棄后續(xù)調(diào)度:調(diào)度請求進入單機執(zhí)行器后,發(fā)現(xiàn)執(zhí)行器存在運行的調(diào)度任務(wù)欣范,本次請求將會被丟棄并標(biāo)記為失敱湫埂;
覆蓋之前調(diào)度:調(diào)度請求進入單機執(zhí)行器后熙卡,發(fā)現(xiàn)執(zhí)行器存在運行的調(diào)度任務(wù)杖刷,將會終止運行中的調(diào)度任務(wù)并清空隊列,然后運行本地調(diào)度任務(wù)驳癌;
子任務(wù) 每個任務(wù)都擁有一個唯一的任務(wù)ID(任務(wù)ID可以從任務(wù)列表獲取)滑燃,當(dāng)本任務(wù)執(zhí)行結(jié)束并且執(zhí)行成功時,將會觸發(fā)子任務(wù)ID所對應(yīng)的任務(wù)的一次主動調(diào)度
任務(wù)超時時間 支持自定義任務(wù)超時時間颓鲜,任務(wù)運行超時將會主動中斷任務(wù)
失敗重試次數(shù) 支持自定義任務(wù)失敗重試次數(shù)表窘,當(dāng)任務(wù)失敗時將會按照預(yù)設(shè)的失敗重試次數(shù)主動進行重試
報警郵件 任務(wù)調(diào)度失敗時郵件通知的郵箱地址,支持配置多郵箱地址甜滨,配置多個郵箱地址時用逗號分隔
負責(zé)人 任務(wù)的負責(zé)人
任務(wù)參數(shù) 任務(wù)執(zhí)行所需的參數(shù)

配置任務(wù)后乐严,右擊選擇執(zhí)行一次進行任務(wù)調(diào)度測試

任務(wù)執(zhí)行后進入調(diào)度日志,可以查看任務(wù)執(zhí)行日志

至此項目的任務(wù)配置完成

2.7 執(zhí)行器集群

執(zhí)行器集群部署時衣摩,幾點要求和建議:

  • 執(zhí)行器回調(diào)地址(xxl.job.admin.addresses)需要保持一致昂验;執(zhí)行器根據(jù)該配置進行執(zhí)行器自動注冊等操作
  • 同一個執(zhí)行器集群內(nèi)AppName(xxl.job.executor.appname)需要保持一致;調(diào)度中心根據(jù)該配置動態(tài)發(fā)現(xiàn)不同集群的在線執(zhí)行器列表

三、用戶管理

進入 “用戶管理” 界面既琴,可查看和管理用戶信息占婉;

目前用戶分為兩種角色:

  • 管理員:擁有全量權(quán)限,支持在線管理用戶信息甫恩,為用戶分配權(quán)限逆济,權(quán)限分配粒度為執(zhí)行器;
  • 普通用戶:僅擁有被分配權(quán)限的執(zhí)行器磺箕,及相關(guān)任務(wù)的操作權(quán)限奖慌;

四、任務(wù)詳解

4.1 BEAN模式(類形式)

Bean模式任務(wù)松靡,支持基于方法的開發(fā)方式简僧,每個任務(wù)對應(yīng)一個方法。

  • 優(yōu)點:
    • 每個任務(wù)只需要開發(fā)一個方法击困,并添加”@XxlJob”注解即可涎劈,更加方便、快速阅茶。
    • 支持自動掃描任務(wù)并注入到執(zhí)行器容器。
  • 缺點:要求Spring容器環(huán)境谅海;

4.1.1 開發(fā)方法詳解

  1. 在Spring Bean實例中脸哀,開發(fā)Job方法,方式格式要求為 "public ReturnT<String> execute(String param)"
  2. 為Job方法添加注解 "@XxlJob(value="自定義jobhandler名稱", init = "JobHandler初始化方法", destroy = "JobHandler銷毀方法")"扭吁,注解value值對應(yīng)的是調(diào)度中心新建任務(wù)的JobHandler屬性的值
  3. 執(zhí)行日志:需要通過 "XxlJobLogger.log" 打印執(zhí)行日志

4.1.2 內(nèi)置 Bean 模式任務(wù)

為方便用戶參考與快速實用撞蜂,示例執(zhí)行器內(nèi)原生提供多個Bean模式任務(wù)Handler,可以直接配置實用侥袜,如下:

  • shardingJobHandler:分片示例任務(wù)蝌诡,任務(wù)內(nèi)部模擬處理分片參數(shù),可參考熟悉分片任務(wù)

  • httpJobHandler:通用HTTP任務(wù)Handler枫吧;業(yè)務(wù)方只需要提供HTTP鏈接等信息即可浦旱,不限制語言傀蚌、平臺厘熟。示例任務(wù)入?yún)⑷缦拢?/p>

    url: http://www.xxx.com  
    method: get 或 post  
    data: post-data
    
  • commandJobHandler:通用命令行任務(wù)Handler;業(yè)務(wù)方只需要提供命令行即可搁胆;如 “pwd”命令

4.2 GLUE模式(Java)

任務(wù)以源碼方式維護在調(diào)度中心例隆,支持通過 Web IDE 在線更新甥捺,實時編譯和生效,因此不需要指定 JobHandler

4.2.1 基本開發(fā)流程

調(diào)度中心镀层,新建調(diào)度任務(wù)

參考新建的任務(wù)進行參數(shù)配置镰禾,運行模式選中 “GLUE模式(Java)”

開發(fā)任務(wù)代碼

選中指定任務(wù),點擊該任務(wù)右側(cè)“GLUE”按鈕,將會前往GLUE任務(wù)的Web IDE界面吴侦,在該界面支持對任務(wù)代碼進行開發(fā)(也可以在IDE中開發(fā)完成后屋休,復(fù)制粘貼到編輯中)。

版本回溯功能(支持30個版本的版本回溯):在GLUE任務(wù)的Web IDE界面妈倔,選擇右上角下拉框“版本回溯”博投,會列出該GLUE的更新歷史,選擇相應(yīng)版本即可顯示該版本代碼盯蝴,保存后GLUE代碼即回退到對應(yīng)的歷史版本毅哗;

執(zhí)行任務(wù)

在任務(wù)管理器中執(zhí)行任務(wù)即可

4.2.2 注入執(zhí)行器中的 Bean 實例

在線編輯的代碼可以通過 @Autowired or @Resource 注解使用執(zhí)行器中注冊在 spring 容器中的 bean 實例,具體配置如下:

添加 SpringUtils 獲取 IOC 容器
/**
 * 獲取 Spring IOC 容器
 *
 */
public class SpringUtils implements ApplicationContextAware {

    private static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        SpringUtils.applicationContext = applicationContext;
    }

    public static ApplicationContext getApplicationContext() {
        return applicationContext;
    }
}
添加測試服務(wù)類
@Service
public class TestService {

    @Value("lzn")
    private String name;

    public TestService() {
    }

    public TestService(String name) {
        this.name = name;
    }

    public String testHello() {
        return "testHello: " + name;
    }
}
修改 XxlJobConfig 配置類

為 XxlJobSpringExecutor 設(shè)置 ApplicationContext

@Bean 
public XxlJobSpringExecutor xxlJobExecutor() {
    logger.info(">>>>>>>>>>> xxl-job config init.");
    XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
    xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
    xxlJobSpringExecutor.setAppname(appname);
    xxlJobSpringExecutor.setAddress(address);
    xxlJobSpringExecutor.setIp(ip);
    xxlJobSpringExecutor.setPort(port);
    xxlJobSpringExecutor.setAccessToken(accessToken);
    xxlJobSpringExecutor.setLogPath(logPath);
    xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
    xxlJobSpringExecutor.setApplicationContext(SpringUtils.getApplicationContext());

    return xxlJobSpringExecutor;
}
任務(wù)代碼修改如下

任務(wù)執(zhí)行后捧挺,執(zhí)行日志如下虑绵,可以看到 TestService#testHello() 方法成功注入并執(zhí)行

4.3 GLUE模式(Shell)

待補充

五、XXL-JOB VS. Kangaroo ETL

5.1 ETL

ETL(Extraction-Transformation-Loading)用來描述將數(shù)據(jù)從來源端經(jīng)過抽让隼印(extract)翅睛、轉(zhuǎn)換(transform)、加載(load)至目的端的過程黑竞。ETL負責(zé)將分布的捕发、異構(gòu)數(shù)據(jù)源中的數(shù)據(jù)如關(guān)系數(shù)據(jù)、平面數(shù)據(jù)文件等抽取到臨時中間層后進行清洗很魂、轉(zhuǎn)換扎酷、集成,最后加載到數(shù)據(jù)倉庫或數(shù)據(jù)集市中遏匆,成為聯(lián)機分析處理法挨、數(shù)據(jù)挖掘的基礎(chǔ)。

ETL 是數(shù)據(jù)倉庫獲取高質(zhì)量數(shù)據(jù)的關(guān)鍵環(huán)節(jié)幅聘,也是BI(商業(yè)智能)項目最重要的一個環(huán)節(jié)凡纳,可以說,ETL設(shè)計的好壞直接關(guān)系到BI項目的成敗帝蒿。

ETL是一個長期的過程荐糜,通過對分散在各業(yè)務(wù)系統(tǒng)中相互關(guān)聯(lián)的分布式異構(gòu)數(shù)據(jù)進行提取(Extract)陵叽、清洗狞尔、轉(zhuǎn)換(Transform)和加載(Load),使這些數(shù)據(jù)成為BI系統(tǒng)需要的有用數(shù)據(jù)巩掺。

5.2 ETL 流程

5.2.1. 數(shù)據(jù)提绕颉(Data Extract)

數(shù)據(jù)提取指的是從不同的網(wǎng)絡(luò)、操作平臺胖替、應(yīng)用研儒、數(shù)據(jù)庫和數(shù)據(jù)格式中抽取數(shù)據(jù)的過程豫缨。

在這個過程中,首先需要根據(jù)實際業(yè)務(wù)需求進行抽取字段的確定端朵,形成一張公共需求的清單好芭,同時清單中的需求字段也應(yīng)當(dāng)和數(shù)據(jù)庫中的字段形成一一映射關(guān)系,這樣通過數(shù)據(jù)抽取得到的數(shù)據(jù)都能夠整齊劃一冲呢,為后續(xù)數(shù)據(jù)轉(zhuǎn)換和加載提供基礎(chǔ)舍败。

具體實現(xiàn)過程中,將會涉及到如何解決從不同類型的數(shù)據(jù)庫(如Oracle敬拓、MySQL邻薯、DB2)、不同類型的文件系統(tǒng)(如Linux乘凸、Windows)以何種方式(如數(shù)據(jù)庫抽取厕诡、文件傳輸、流式)营勤、何種頻率(分鐘灵嫌、小時、天葛作、周寿羞、月等)獲取數(shù)據(jù)的問題。

5.2.2 數(shù)據(jù)轉(zhuǎn)換(Data Transformation)

數(shù)據(jù)轉(zhuǎn)換是指將數(shù)據(jù)從一種表現(xiàn)形式變?yōu)榱硪环N表現(xiàn)形式的過程赂蠢,即對數(shù)據(jù)進行整合稠曼、拆分和變換。

  • 數(shù)據(jù)整合是指通過多表關(guān)聯(lián)客年,將不同類型數(shù)據(jù)之間可能存在潛在關(guān)聯(lián)關(guān)系的多條數(shù)據(jù)進行合并,通過數(shù)據(jù)的整合漠吻,豐富數(shù)據(jù)維度量瓜,有利于發(fā)現(xiàn)更多有價值的信息。
  • 數(shù)據(jù)拆分是指按一定規(guī)則對數(shù)據(jù)進行拆分途乃,將一條數(shù)據(jù)拆分為多條绍傲。
  • 數(shù)據(jù)變換是指對數(shù)據(jù)進行行列轉(zhuǎn)換、排序耍共、修改序號烫饼、去除重復(fù)記錄變換操作。

數(shù)據(jù)轉(zhuǎn)換一般包括兩類工作:

一種是對數(shù)據(jù)名稱及格式進行統(tǒng)一试读,另一種則是數(shù)據(jù)倉庫中可能會有源數(shù)據(jù)庫中不存在的數(shù)據(jù)杠纵,因此需要進行字段的組合、分割或計算钩骇。

數(shù)據(jù)轉(zhuǎn)換其實還包含了數(shù)據(jù)清洗的工作比藻,根據(jù)業(yè)務(wù)規(guī)則對異常數(shù)據(jù)進行清洗铝量,將不完整、錯誤或重復(fù)的數(shù)據(jù)進行相應(yīng)處理银亲,以保證后續(xù)分析結(jié)果的準確性慢叨。

5.2.3 數(shù)據(jù)加載(Data Loading)

數(shù)據(jù)加載的主要任務(wù)是將清洗過后的干凈的數(shù)據(jù)集按物理數(shù)據(jù)模型定義的表結(jié)構(gòu)裝入目標(biāo)數(shù)據(jù)倉庫的數(shù)據(jù)表中,并允許人工干預(yù)务蝠,以及提供強大的錯誤報告拍谐、系統(tǒng)日志、數(shù)據(jù)備份與恢復(fù)功能馏段。

數(shù)據(jù)加載的方式主要有:增量加載(時間戳方式轩拨、日志表方式、全表比對方式)毅弧、全量加載(全表刪除再插入方式)气嫁。整個操作過程中往往要跨網(wǎng)絡(luò)、跨操作平臺够坐。實際工作中寸宵,數(shù)據(jù)加載需要結(jié)合使用的數(shù)據(jù)庫系統(tǒng),確定最優(yōu)的數(shù)據(jù)加載方案元咙,節(jié)約CPU梯影、硬盤IO和網(wǎng)絡(luò)傳輸資源。

5.3 指標(biāo)對比

XXL-JOB Kangaroo ETL
定位 面向系統(tǒng)定時任務(wù)的任務(wù)調(diào)度平臺 面向數(shù)據(jù)服務(wù)的任務(wù)調(diào)度平臺
任務(wù)類型 1 Bean模式:每個Bean模式任務(wù)都是一個Spring的Bean類實例庶香,它被維護在“執(zhí)行器”項目的Spring容器中
2 GLUE模式(Java):通過Groovy類加載器加載此代碼甲棍,實例化成Java對象,執(zhí)行任務(wù)邏輯
3 GLUE模式(Shell) + GLUE模式(Python) + GLUE模式(PHP) + GLUE模式(NodeJS) + GLUE模式(Powershell):腳本任務(wù)的源碼托管在調(diào)度中心赶掖,腳本邏輯在執(zhí)行器運行感猛。通過Java代碼調(diào)用腳本,在調(diào)度中心可以實時監(jiān)控腳本運行情況
1 DX 任務(wù):支持 Oracle奢赂、MySQL陪白、SQL Server、Text 文件膳灶、HDFS(RCFile,TextFile,Orc 格式)5 種數(shù)據(jù)源之間的數(shù)據(jù)交換
2 Script 任務(wù):支持 shell 腳本咱士,通常用于執(zhí)行 HiveSQL、SparkSQL轧钓、Jar 等
系統(tǒng)侵入性 有侵入 無侵入
系統(tǒng)兼容性 兼容 Spring序厉、Dubbo等主流框架。針對非Java應(yīng)用毕箍,可借助 XXL-JOB 的標(biāo)準 RESTful API 方便的實現(xiàn)多語言支持 -
適用場景 將系統(tǒng)的定時任務(wù)和調(diào)度解耦弛房,任務(wù)調(diào)度交給統(tǒng)一平臺進行管理,提高系統(tǒng)可用性和穩(wěn)定性 將分布的霉晕、異構(gòu)的數(shù)據(jù)源中的數(shù)據(jù)通過合理的任務(wù)調(diào)度進行抽取庭再、轉(zhuǎn)換和加載捞奕,為后續(xù)的數(shù)據(jù)分析、數(shù)據(jù)挖掘做基礎(chǔ)

六拄轻、總體設(shè)計

6.1 架構(gòu)設(shè)計

6.1.1 系統(tǒng)組成

  • 調(diào)度模塊(調(diào)度中心)
    負責(zé)管理調(diào)度信息颅围,按照調(diào)度配置發(fā)出調(diào)度請求,自身不承擔(dān)業(yè)務(wù)代碼恨搓。調(diào)度系統(tǒng)與任務(wù)解耦院促,提高了系統(tǒng)可用性和穩(wěn)定性,同時調(diào)度系統(tǒng)性能不再受限于任務(wù)模塊斧抱;
    支持可視化常拓、簡單且動態(tài)的管理調(diào)度信息,包括任務(wù)新建辉浦,更新弄抬,刪除,GLUE開發(fā)和任務(wù)報警等宪郊,所有上述操作都會實時生效掂恕,同時支持監(jiān)控調(diào)度結(jié)果以及執(zhí)行日志,支持執(zhí)行器Failover弛槐;
  • 執(zhí)行模塊(執(zhí)行器)
    負責(zé)接收調(diào)度請求并執(zhí)行任務(wù)邏輯懊亡。任務(wù)模塊專注于任務(wù)的執(zhí)行等操作,開發(fā)和維護更加簡單和高效乎串;
    接收“調(diào)度中心”的執(zhí)行請求店枣、終止請求和日志請求等;

6.1.2 架構(gòu)圖

6.2 任務(wù)注冊和自動發(fā)現(xiàn)

  • AppName: 每個執(zhí)行器機器集群的唯一標(biāo)示, 任務(wù)注冊以 "執(zhí)行器" 為最小粒度進行注冊; 每個任務(wù)通過其綁定的執(zhí)行器可感知對應(yīng)的執(zhí)行器機器列表
  • 注冊表: 見"xxl_job_registry"表, "執(zhí)行器" 在進行任務(wù)注冊時將會周期性維護一條注冊記錄叹誉,即機器地址和AppName的綁定關(guān)系; "調(diào)度中心" 從而可以動態(tài)感知每個AppName在線的機器列表
  • 執(zhí)行器注冊: 任務(wù)注冊Beat周期默認30s; 執(zhí)行器以一倍Beat進行執(zhí)行器注冊, 調(diào)度中心以一倍Beat進行動態(tài)任務(wù)發(fā)現(xiàn); 注冊信息的失效時間為三倍Beat
  • 執(zhí)行器注冊摘除:執(zhí)行器銷毀時鸯两,將會主動上報調(diào)度中心并摘除對應(yīng)的執(zhí)行器機器信息,提高心跳注冊的實時性

七 源碼解析

xxl-job時序圖

7.1 XxlJobSpringExecutor 啟動

diagram

項目啟動后獲取配置文件中的屬性值并實例化配置類 XxlJobSpringExecutor长豁,XxlJobSpringExecutor 實現(xiàn)了 SmartInitializingSingleton 接口的 afterSingletonsInstantiated 方法甩卓,在實例化 XxlJobSpringExecutor 后觸發(fā)執(zhí)行,XxlJobSpringExecutor 在該方法里做了兩件事:

  1. 調(diào)用 initJobHandlerMethodRepository 方法掃描項目中帶 @XxlJob 注解的方法(即 jobHandler)并注冊;

  2. 調(diào)用父類 XxlJobExecutor 的 start 方法啟動 Executor缀棍,并初始化核心組件:

    • 初始化 admin 控制臺:initAdminBizList(adminAddresses, accessToken)

    • 初始化日志清理進程 JobLogFileCleanThread:JobLogFileCleanThread.getInstance().start(logRetentionDays)

    • 初始化觸發(fā)回調(diào)進程 TriggerCallbackThread:TriggerCallbackThread.getInstance().start()

    • 初始化內(nèi)置服務(wù) executor-server:initEmbedServer(address, ip, port, appname, accessToken)

7.1.1 jobHandler 注冊

在項目啟動時宅此,執(zhí)行器會通過 “@JobHandler” 識別 Spring 容器中 “Bean模式任務(wù)”,以注解的 value 屬性為 key 管理起來爬范,保存至 jobHandlerRepository父腕。

1 initJobHandlerMethodRepository(ApplicationContext applicationContext)

該方法掃描項目中的類,將帶有 @XxlJob 注解的 job 進行解析青瀑,并調(diào)用 registJobHandler(String name, IJobHandler jobHandler) 進行注冊:

// ---------------------- job handler repository ----------------------
private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();

// registry jobhandler
/**
 * name: @XxlJob 的 value, jobHandler 的名字
 * bean:帶 @XxlJob 方法對應(yīng)的類名
 * method: 帶 @XxlJob 注解的方法名
 * initMethod: @XxlJob 的 init 值對應(yīng)方法的全限定名璧亮,即 execute 方法調(diào)用前執(zhí)行的方法
 * destroyMethod: @XxlJob 的 destroy 值對應(yīng)方法的全限定名萧诫,即 execute 方法調(diào)用后執(zhí)行的方法
 */
registJobHandler(name, new MethodJobHandler(bean, method, initMethod, destroyMethod));

2 MethodJobHandler

public class MethodJobHandler extends IJobHandler {

    private final Object target;
    private final Method method;
    private Method initMethod;
    private Method destroyMethod;

    public MethodJobHandler(Object target, Method method, Method initMethod, Method destroyMethod) {
        this.target = target;
        this.method = method;

        this.initMethod =initMethod;
        this.destroyMethod =destroyMethod;
    }

    @Override
    public ReturnT<String> execute(String param) throws Exception {
        return (ReturnT<String>) method.invoke(target, new Object[]{param});
    }

    @Override
    public void init() throws InvocationTargetException, IllegalAccessException {
        if(initMethod != null) {
            initMethod.invoke(target);
        }
    }

    @Override
    public void destroy() throws InvocationTargetException, IllegalAccessException {
        if(destroyMethod != null) {
            destroyMethod.invoke(target);
        }
    }
}

7.1.2 XxlJobSpringExecutor 初始化

7.1.2.1 初始化 admin 控制臺

initAdminBizList 實例化 AdminBizClient,并將其存在 adminBizList 中

private static List<AdminBiz> adminBizList;
private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
    if (adminAddresses!=null && adminAddresses.trim().length()>0) {
        // 多個 admin 控制臺地址以 "," 分隔
        for (String address: adminAddresses.trim().split(",")) {
            if (address!=null && address.trim().length()>0) {
                AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);
                if (adminBizList == null) {
                    adminBizList = new ArrayList<AdminBiz>();
                }
                adminBizList.add(adminBiz);
            }
        }
    }
}
7.1.2.2 初始化日志清理進程
7.1.2.3 初始化觸發(fā)回調(diào)進程
7.1.2.4 初始化內(nèi)置服務(wù)
private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {
    // start
    embedServer = new EmbedServer();
    embedServer.start(address, port, appname, accessToken);
}

initEmbedServer 方法內(nèi)部直接調(diào)用 EmbedServer#start 方法

public void start(final String address, final int port, final String appname, final String accessToken) {
    // 1 初始化 executorBiz 執(zhí)行單元和線程池
    executorBiz = new ExecutorBizImpl();
    ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
        0,
        200,
        60L,
        TimeUnit.SECONDS,
        new LinkedBlockingQueue<Runnable>(2000),
        new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "xxl-rpc, EmbedServer bizThreadPool-" + r.hashCode());
            }
        },
        new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
            }
        });
    // 2 啟動 ServerBootstrap 并綁定端口號
    ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel channel) throws Exception {
                    channel.pipeline()
                            .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle
                            .addLast(new HttpServerCodec())
                            .addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL
                            .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
                }
            })
            .childOption(ChannelOption.SO_KEEPALIVE, true);

    ChannelFuture future = bootstrap.bind(port).sync();
    
    // 3 將執(zhí)行器信息注冊到調(diào)度中心(admin 控制臺)
    startRegistry(appname, address);
    
    // 4 wait util stop
    future.channel().closeFuture().sync();
}

EmbedServer 的 start 方法主要處理以下幾件事:

  1. 初始化 executorBiz 和線程池 bizThreadPool

  2. 啟動 ServerBootstrap 并綁定端口號

    調(diào)度中心實際的調(diào)度請求由 EmbedHttpServerHandler 處理

  3. 將執(zhí)行器信息注冊到調(diào)度中心(admin 控制臺)

    public void startRegistry(final String appname, final String address) {
        // start registry
        ExecutorRegistryThread.getInstance().start(appname, address);
    }
    

    ExecutorRegistryThread#start 方法封裝執(zhí)行器信息枝嘶,并調(diào)用 AdminBizClient#registry 方法將執(zhí)行器信息注冊于調(diào)度中心帘饶,核心代碼如下:

    RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
    for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
        try {
            // AdminBizClient 實現(xiàn) AdminBiz 接口,將執(zhí)行器信息進行注冊
            ReturnT<String> registryResult = adminBiz.registry(registryParam);
            if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                registryResult = ReturnT.SUCCESS;
                logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                break;
            } else {
                logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
            }
        } catch (Exception e) {
            logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
        }
    }
    

7.2 調(diào)度平臺發(fā)布任務(wù)

前端頁面處罰具體任務(wù)后群扶,轉(zhuǎn)發(fā)至 JobInfoController 的 triggerJob 方法

/**
 * 處理觸發(fā)請求
 *
 * @param id                任務(wù) id
 * @param executorParam     執(zhí)行器參數(shù)
 * @param addressList       執(zhí)行器地址
 * @return
 */
@RequestMapping("/trigger")
@ResponseBody
public ReturnT<String> triggerJob(int id, String executorParam, String addressList) {
    JobTriggerPoolHelper.trigger(id, TriggerTypeEnum.MANUAL, -1, null, executorParam, addressList);
    return ReturnT.SUCCESS;
}

JobTriggerPoolHelper 的 trigger 方法調(diào)用 addTrigger 方法及刻,addTrigger 方法將調(diào)度請求交給線程池處理,線程池中通過 XxlJobTrigger 的 trigger 方法處理實際請求

public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) {
    // 獲取任務(wù)詳細信息
    XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
    if (jobInfo == null) {
        logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid竞阐,jobId={}", jobId);
        return;
    }
    // 設(shè)置任務(wù)執(zhí)行參數(shù)
    if (executorParam != null) {
        jobInfo.setExecutorParam(executorParam);
    }
    int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();
    // 獲取執(zhí)行器詳細信息
    XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());

    // 如果手動設(shè)置執(zhí)行任務(wù)機器的地址缴饭,則覆蓋執(zhí)行器原有的地址(即新增執(zhí)行器時自動注冊或手動錄入的地址)
    if (addressList!=null && addressList.trim().length()>0) {
        group.setAddressType(1);
        group.setAddressList(addressList.trim());
    }

    // sharding param
    int[] shardingParam = null;
    if (executorShardingParam!=null){
        String[] shardingArr = executorShardingParam.split("/");
        if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {
            shardingParam = new int[2];
            shardingParam[0] = Integer.valueOf(shardingArr[0]);
            shardingParam[1] = Integer.valueOf(shardingArr[1]);
        }
    }
    // 如果是分片廣播任務(wù),則根據(jù)執(zhí)行器數(shù)量將任務(wù)發(fā)布至各個機器地址
    if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
            && group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
            && shardingParam==null) {
        for (int i = 0; i < group.getRegistryList().size(); i++) {
            processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
        }
    } else { // 其他路由策略的任務(wù)發(fā)布一條即可
        if (shardingParam == null) {
            shardingParam = new int[]{0, 1};
        }
        processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
    }
}

processTrigger 方法主要處理以下幾件事:

  1. 初始化觸發(fā)參數(shù):TriggerParam

  2. 根據(jù)不同路由策略獲取執(zhí)行器地址

    根據(jù)不同的 executorRouteStrategy 策略獲取 ExecutorRouter骆莹,并調(diào)用 ExecutorRouter 的 route 方法選擇一個執(zhí)行器地址

    // 根據(jù)不同路由策略獲取執(zhí)行器 ip 地址
    routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
    
  3. 發(fā)送調(diào)度請求至遠程執(zhí)行器

    runExecutor 方法根據(jù)執(zhí)行器地址獲取執(zhí)行單元 ExecutorBizClient颗搂,并調(diào)用 ExecutorBizClient 的 run 方法將調(diào)度請求以 HTTP POST 形式發(fā)送,由執(zhí)行器的 EmbedServer 接受

    public ReturnT<String> run(TriggerParam triggerParam) {
        return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
    }
    
  4. 保存任務(wù)執(zhí)行信息

private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){

    // param
    ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);  // block strategy
    ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);    // route strategy
    String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;

    // 1幕垦、save log-id
    XxlJobLog jobLog = new XxlJobLog();
    jobLog.setJobGroup(jobInfo.getJobGroup());
    jobLog.setJobId(jobInfo.getId());
    jobLog.setTriggerTime(new Date());
    XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
    logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());

    // 2丢氢、init trigger-param
    TriggerParam triggerParam = new TriggerParam();
    triggerParam.setJobId(jobInfo.getId());
    triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
    triggerParam.setExecutorParams(jobInfo.getExecutorParam());
    triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
    triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
    triggerParam.setLogId(jobLog.getId());
    triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());
    triggerParam.setGlueType(jobInfo.getGlueType());
    triggerParam.setGlueSource(jobInfo.getGlueSource());
    triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
    triggerParam.setBroadcastIndex(index);
    triggerParam.setBroadcastTotal(total);

    // 3、init address
    String address = null;
    ReturnT<String> routeAddressResult = null;
    if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
        if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
            if (index < group.getRegistryList().size()) {
                address = group.getRegistryList().get(index);
            } else {
                address = group.getRegistryList().get(0);
            }
        } else {
            // 根據(jù)不同路由策略獲取執(zhí)行器 ip 地址
            routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
            if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
                address = routeAddressResult.getContent();
            }
        }
    } else {
        routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
    }

    // 4智嚷、trigger remote executor
    ReturnT<String> triggerResult = null;
    if (address != null) {
        triggerResult = runExecutor(triggerParam, address);
    } else {
        triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
    }

    // 5卖丸、collection trigger info
    StringBuffer triggerMsgSb = new StringBuffer();
    triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":")
            .append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());
    if (shardingParam != null) {
        triggerMsgSb.append("("+shardingParam+")");
    }
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);

    triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>")
            .append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():"");

    // 6、save log trigger-info
    jobLog.setExecutorAddress(address);
    jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
    jobLog.setExecutorParam(jobInfo.getExecutorParam());
    jobLog.setExecutorShardingParam(shardingParam);
    jobLog.setExecutorFailRetryCount(finalFailRetryCount);
    //jobLog.setTriggerTime();
    jobLog.setTriggerCode(triggerResult.getCode());
    jobLog.setTriggerMsg(triggerMsgSb.toString());
    XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);

    logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
}

7.3 執(zhí)行器接收并處理任務(wù)

執(zhí)行器接收到調(diào)度中心的調(diào)度請求時盏道,如果任務(wù)類型為 “Bean模式”稍浆,將會匹配 Spring 容器中的 “Bean模式任務(wù)”,然后調(diào)用其 execute 方法猜嘱,執(zhí)行任務(wù)邏輯衅枫。如果任務(wù)類型為 “GLUE模式”,將會加載 GLue 代碼朗伶,實例化 Java 對象弦撩,注入依賴的 Spring 服務(wù)(注意:Glue代碼中注入的Spring服務(wù),必須存在與該“執(zhí)行器”項目的Spring容器中)论皆,然后調(diào)用execute方法益楼,執(zhí)行任務(wù)邏輯。

7.3.1 EmbedServer 接收調(diào)度請求

EmbedHttpServerHandler 調(diào)用 channelRead0 方法處理任務(wù)調(diào)度請求点晴,每個請求通過線程池 bizThreadPool 新開一個線程進行處理

@Override
protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
    // invoke
    bizThreadPool.execute(new Runnable() {
        @Override
        public void run() {
            // do invoke
            Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);
            // to json
            String responseJson = GsonTool.toJson(responseObj);
            // write response
            writeResponse(ctx, keepAlive, responseJson);
        }
    });
}

process 方法根據(jù) uri 的不同取值調(diào)用 ExecutorBiz 不同的處理方法:

if ("/beat".equals(uri)) {
    return executorBiz.beat();
} else if ("/idleBeat".equals(uri)) {
    IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
    return executorBiz.idleBeat(idleBeatParam);
} else if ("/run".equals(uri)) {
    TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
    return executorBiz.run(triggerParam);
} else if ("/kill".equals(uri)) {
    KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
    return executorBiz.kill(killParam);
} else if ("/log".equals(uri)) {
    LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
    return executorBiz.log(logParam);
} else {
    return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");
}

7.3.2 ExecutorBiz(執(zhí)行器單元)

ExecutorBiz#run(TriggerParam triggerParam) 方法處理任務(wù)請求感凤,源碼如下

public ReturnT<String> run(TriggerParam triggerParam) {
    // load old:jobHandler + jobThread
    JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
    IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
    String removeOldReason = null;

    // valid:jobHandler + jobThread
    // 1 根據(jù)任務(wù)模式(GlueTypeEnum)的不同進行不同的任務(wù)處理邏輯
    GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
    if (GlueTypeEnum.BEAN == glueTypeEnum) { // “Bean模式” 任務(wù)

        // 根據(jù) triggerParam.getExecutorHandler() 加載任務(wù)處理器,此處 triggerParam.getExecutorHandler() 等于 @XxlJob 注解中的 value 值
        IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());

        // valid old jobThread
        if (jobThread!=null && jobHandler != newJobHandler) {
            // change handler, need kill old thread
            removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";

            jobThread = null;
            jobHandler = null;
        }

        // valid handler
        if (jobHandler == null) {
            jobHandler = newJobHandler;
            if (jobHandler == null) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
            }
        }

    } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) { // “GLUE模式(Java)” 任務(wù)

        // valid old jobThread
        if (jobThread != null &&
                !(jobThread.getHandler() instanceof GlueJobHandler
                    && ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
            // change handler or gluesource updated, need kill old thread
            removeOldReason = "change job source or glue type, and terminate the old job thread.";

            jobThread = null;
            jobHandler = null;
        }

        // valid handler
        if (jobHandler == null) {
            try {
                IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
                jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
            }
        }
    } else if (glueTypeEnum!=null && glueTypeEnum.isScript()) { // 其他腳本模式任務(wù)

        // valid old jobThread
        if (jobThread != null &&
                !(jobThread.getHandler() instanceof ScriptJobHandler
                        && ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
            // change script or gluesource updated, need kill old thread
            removeOldReason = "change job source or glue type, and terminate the old job thread.";

            jobThread = null;
            jobHandler = null;
        }

        // valid handler
        if (jobHandler == null) {
            jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));
        }
    } else {
        return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
    }

    // 2 處理不同的阻塞策略
    if (jobThread != null) {
        ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
        // 丟棄后續(xù)調(diào)度:調(diào)度請求進入單機執(zhí)行器后粒督,發(fā)現(xiàn)執(zhí)行器存在運行的調(diào)度任務(wù)陪竿,本次請求將會被丟棄并標(biāo)記為失敗
        if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
            // discard when running
            if (jobThread.isRunningOrHasQueue()) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
            }
        } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) { // 覆蓋之前調(diào)度:調(diào)度請求進入單機執(zhí)行器后,發(fā)現(xiàn)執(zhí)行器存在運行的調(diào)度任務(wù)屠橄,將會終止運行中的調(diào)度任務(wù)并清空隊列族跛,然后運行本地調(diào)度任務(wù)
            // kill running jobThread
            if (jobThread.isRunningOrHasQueue()) {
                removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();

                jobThread = null;
            }
        } else {
            // just queue trigger
        }
    }

    // 新建或替換任務(wù)已有的 jobHandler
    if (jobThread == null) {
        jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
    }

    // 3 將調(diào)度請求直接放入 jobThread 的觸發(fā)隊列進行異步處理
    ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
    return pushResult;
}

run() 方法主要做了以下幾件事:

  1. 根據(jù)任務(wù)模式(GlueTypeEnum)的不同進行不同的任務(wù)處理邏輯

    • Bean模式

      每個Bean模式任務(wù)都是一個Spring的Bean類實例闰挡,它被維護在“執(zhí)行器”項目的Spring容器中。任務(wù)類需要加“@JobHandler(value=”名稱”)”注解礁哄,因為“執(zhí)行器”會根據(jù)該注解識別Spring容器中的任務(wù)长酗。任務(wù)類需要繼承統(tǒng)一接口“IJobHandler”,任務(wù)邏輯在execute方法中開發(fā)姐仅,因為“執(zhí)行器”在接收到調(diào)度中心的調(diào)度請求時花枫,將會調(diào)用“IJobHandler”的execute方法,執(zhí)行任務(wù)邏輯

    • GLUE模式(Java)

      每個 “GLUE模式(Java)” 任務(wù)的代碼掏膏,實際上是“一個繼承自“IJobHandler”的實現(xiàn)類的類代碼”劳翰,“執(zhí)行器”接收到“調(diào)度中心”的調(diào)度請求時,會通過Groovy類加載器加載此代碼馒疹,實例化成Java對象佳簸,同時注入此代碼中聲明的Spring服務(wù)(請確保Glue代碼中的服務(wù)和類引用在“執(zhí)行器”項目中存在),然后調(diào)用該對象的execute方法颖变,執(zhí)行任務(wù)邏輯

    • 其他腳本任務(wù)

      腳本任務(wù)的源碼托管在調(diào)度中心生均,腳本邏輯在執(zhí)行器運行。當(dāng)觸發(fā)腳本任務(wù)時腥刹,執(zhí)行器會加載腳本源碼在執(zhí)行器機器上生成一份腳本文件马胧,然后通過Java代碼調(diào)用該腳本;并且實時將腳本輸出日志寫到任務(wù)日志文件中衔峰,從而在調(diào)度中心可以實時監(jiān)控腳本運行情況

  2. 根據(jù) blockStrategy 處理不同的阻塞策略

    • 丟棄后續(xù)調(diào)度:調(diào)度請求進入單機執(zhí)行器后佩脊,發(fā)現(xiàn)執(zhí)行器存在運行的調(diào)度任務(wù),本次請求將會被丟棄并標(biāo)記為失敗
    • 覆蓋之前調(diào)度:調(diào)度請求進入單機執(zhí)行器后垫卤,發(fā)現(xiàn)執(zhí)行器存在運行的調(diào)度任務(wù)威彰,將會終止運行中的調(diào)度任務(wù)并清空隊列,然后運行本地調(diào)度任務(wù)
    if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
     /* 丟棄后續(xù)調(diào)度:調(diào)度請求進入單機執(zhí)行器后穴肘,發(fā)現(xiàn)執(zhí)行器存在運行的調(diào)度任務(wù)歇盼,本次請求將會被丟棄并標(biāo)記為失敗 */
        if (jobThread.isRunningOrHasQueue()) {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
        }
    } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) { 
     /* 
         覆蓋之前調(diào)度:若當(dāng)前任務(wù)的處理線程(jobThread)有正在運行的任務(wù)或調(diào)度請求隊列不為空,則清空當(dāng)前任務(wù)處理線程
         然后創(chuàng)建一個新的 jobThread 處理當(dāng)前調(diào)度請求 
     */
        if (jobThread.isRunningOrHasQueue()) {
            removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
            jobThread = null;
        }
    } else {
        // just queue trigger
    }
    
    if (jobThread == null) {
        // registJobThread 創(chuàng)建新的任務(wù)處理進程并將已存在的任務(wù)處理進程清除
        jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
    }
    
  3. 將調(diào)度請求放入 JobThread 的阻塞隊列 triggerQueue评抚,等待后續(xù)處理

    ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
    
    LinkedBlockingQueue<TriggerParam> triggerQueue
    
    public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {
     // avoid repeat
     if (triggerLogIdSet.contains(triggerParam.getLogId())) {
         logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());
         return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());
     }
    
     // avoid repeat trigger for the same TRIGGER_LOG_ID
     triggerLogIdSet.add(triggerParam.getLogId());
     triggerQueue.add(triggerParam);
     return ReturnT.SUCCESS;
    }
    

    JobThread 通過 run() 方法循環(huán)處理請求豹缀,具體處理細節(jié)參見下節(jié) 7.3.4 JobThread:任務(wù)處理線程

7.3.4 JobThread(任務(wù)處理線程)

public void run() {
    // init
    handler.init();
    
    // 1 從阻塞隊列獲取調(diào)度請求并處理
    while(!toStop){
        // to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
        TriggerParam triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
    
        // execute
        XxlJobLogger.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + triggerParam.getExecutorParams());
        // 若任務(wù)設(shè)置超時時間,則在限定時間內(nèi)異步獲取執(zhí)行結(jié)果
        if (triggerParam.getExecutorTimeout() > 0) {
            // limit timeout
            final TriggerParam triggerParamTmp = triggerParam;
            FutureTask<ReturnT<String>> futureTask = new FutureTask<ReturnT<String>>(new Callable<ReturnT<String>>() {
                @Override
                public ReturnT<String> call() throws Exception {
                    return handler.execute(triggerParamTmp.getExecutorParams());
                }
            });
            futureThread = new Thread(futureTask);
            futureThread.start();
            // 異步獲取執(zhí)行結(jié)果
            executeResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
        } else {
            // 若沒有設(shè)置超時時間慨代,則同步獲取任務(wù)執(zhí)行結(jié)果
            executeResult = handler.execute(triggerParam.getExecutorParams());
        }
    
        if (executeResult == null) {
            executeResult = IJobHandler.FAIL;
        } else {
            executeResult.setMsg(
                    (executeResult!=null&&executeResult.getMsg()!=null&&executeResult.getMsg().length()>50000)
                            ?executeResult.getMsg().substring(0, 50000).concat("...")
                            :executeResult.getMsg());
            executeResult.setContent(null); // limit obj size
        }
        XxlJobLogger.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + executeResult);
    
        // 2 任務(wù)處理結(jié)果回調(diào)
        if(triggerParam != null) {
            // callback handler info
            if (!toStop) {
                // commonm
                TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), executeResult));
            } else {
                // is killed
                ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job running, killed]");
                TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult));
            }
        }
    }
    
    // 3 任務(wù)處理進行被殺死耿眉,隊列中剩余的調(diào)度請求進行失敗處理,并回調(diào)任務(wù)處理結(jié)果
    while(triggerQueue !=null && triggerQueue.size()>0){
        TriggerParam triggerParam = triggerQueue.poll();
        if (triggerParam!=null) {
            // is killed
            ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job not executed, in the job queue, killed.]");
            TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult));
        }
    }
    
    // destroy
    handler.destroy();
}

run() 方法主要做了以下幾件事:

  1. 從阻塞隊列獲取調(diào)度請求并處理

    // to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
    TriggerParam triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
    // 若任務(wù)設(shè)置超時時間鱼响,則在限定時間內(nèi)異步獲取執(zhí)行結(jié)果
    if (triggerParam.getExecutorTimeout() > 0) {
        // limit timeout
        final TriggerParam triggerParamTmp = triggerParam;
        FutureTask<ReturnT<String>> futureTask = new FutureTask<ReturnT<String>>(new Callable<ReturnT<String>>() {
            @Override
            public ReturnT<String> call() throws Exception {
                return handler.execute(triggerParamTmp.getExecutorParams());
            }
        });
        futureThread = new Thread(futureTask);
        futureThread.start();
        // 異步獲取執(zhí)行結(jié)果
        executeResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
    } else {
        // 若沒有設(shè)置超時時間,則同步獲取任務(wù)執(zhí)行結(jié)果
        executeResult = handler.execute(triggerParam.getExecutorParams());
    }
    
  2. 任務(wù)處理結(jié)果回調(diào)

    if (!toStop) {
        // commonm
        TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), executeResult));
    } else {
        // is killed
        ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job running, killed]");
        TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult));
    }
    

    將任務(wù)執(zhí)行結(jié)果 executeResult 放入 TriggerCallbackThread 的任務(wù)結(jié)果隊列 callBackQueue

    /**
     * job results callback queue
     */
    private LinkedBlockingQueue<HandleCallbackParam> callBackQueue = new LinkedBlockingQueue<HandleCallbackParam>();
    public static void pushCallBack(HandleCallbackParam callback){
        getInstance().callBackQueue.add(callback);
    }
    

    TriggerCallbackThread 創(chuàng)建 triggerCallbackThread 線程從 callBackQueue 中獲取 executeResult 并將其以 HTTP POST 形式發(fā)送給調(diào)度中心组底,核心處理方法見 doCallback()

    private void doCallback(List<HandleCallbackParam> callbackParamList){
        // callback, will retry if error
        for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
            ReturnT<String> callbackResult = adminBiz.callback(callbackParamList);
        }
    }
    
    public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
        return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, timeout, callbackParamList, String.class);
    }
    

    調(diào)度中心獲取任務(wù)結(jié)果參數(shù)后丈积,調(diào)用AdminBizImpl#callback(com.xxl.job.core.biz.model.HandleCallbackParam) 方法進行處理

    private ReturnT<String> callback(HandleCallbackParam handleCallbackParam) {
        // valid log item
        XxlJobLog log = xxlJobLogDao.load(handleCallbackParam.getLogId());
        }
    
        // trigger success, to trigger child job
        String callbackMsg = null;
        if (IJobHandler.SUCCESS.getCode() == handleCallbackParam.getExecuteResult().getCode()) {
            XxlJobInfo xxlJobInfo = xxlJobInfoDao.loadById(log.getJobId());
            if (xxlJobInfo!=null && xxlJobInfo.getChildJobId()!=null && xxlJobInfo.getChildJobId().trim().length()>0) {
                callbackMsg = "<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_child_run") +"<<<<<<<<<<< </span><br>";
    
                String[] childJobIds = xxlJobInfo.getChildJobId().split(",");
                for (int i = 0; i < childJobIds.length; i++) {
                    int childJobId = (childJobIds[i]!=null && childJobIds[i].trim().length()>0 && isNumeric(childJobIds[i]))?Integer.valueOf(childJobIds[i]):-1;
                    if (childJobId > 0) {
    
                        JobTriggerPoolHelper.trigger(childJobId, TriggerTypeEnum.PARENT, -1, null, null, null);
                        ReturnT<String> triggerChildResult = ReturnT.SUCCESS;
    
                        // add msg
                        callbackMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg1"),
                                (i+1),
                                childJobIds.length,
                                childJobIds[i],
                                (triggerChildResult.getCode()==ReturnT.SUCCESS_CODE?I18nUtil.getString("system_success"):I18nUtil.getString("system_fail")),
                                triggerChildResult.getMsg());
                    } else {
                        callbackMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg2"),
                                (i+1),
                                childJobIds.length,
                                childJobIds[i]);
                    }
                }
    
            }
        }
    
        // handle msg
        StringBuffer handleMsg = new StringBuffer();
        if (log.getHandleMsg()!=null) {
            handleMsg.append(log.getHandleMsg()).append("<br>");
        }
        if (handleCallbackParam.getExecuteResult().getMsg() != null) {
            handleMsg.append(handleCallbackParam.getExecuteResult().getMsg());
        }
        if (callbackMsg != null) {
            handleMsg.append(callbackMsg);
        }
    
        if (handleMsg.length() > 15000) {
            handleMsg = new StringBuffer(handleMsg.substring(0, 15000));  // text最大64kb 避免長度過長
        }
    
        // success, save log
        log.setHandleTime(new Date());
        log.setHandleCode(handleCallbackParam.getExecuteResult().getCode());
        log.setHandleMsg(handleMsg.toString());
        xxlJobLogDao.updateHandleInfo(log);
    
        return ReturnT.SUCCESS;
    }
    
  3. JobThread 停止后處理隊列遺留調(diào)度請求

    while(triggerQueue !=null && triggerQueue.size()>0){
        TriggerParam triggerParam = triggerQueue.poll();
        if (triggerParam!=null) {
            // is killed
            ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job not executed, in the job queue, killed.]");
            TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult));
        }
    }
    

7.3.5 IJobHandler(任務(wù)單元)

public abstract class IJobHandler {
    
    /** success */
    public static final ReturnT<String> SUCCESS = new ReturnT<String>(200, null);
    /** fail */
    public static final ReturnT<String> FAIL = new ReturnT<String>(500, null);
    /** fail timeout */
    public static final ReturnT<String> FAIL_TIMEOUT = new ReturnT<String>(502, null);

    /**
     * execute handler, invoked when executor receives a scheduling request
     *
     * @param param
     * @return
     * @throws Exception
     */
    public abstract ReturnT<String> execute(String param) throws Exception;

    /**
     * init handler, invoked when JobThread init
     */
    public void init() throws InvocationTargetException, IllegalAccessException {
        // do something
    }

    /**
     * destroy handler, invoked when JobThread destroy
     */
    public void destroy() throws InvocationTargetException, IllegalAccessException {
        // do something
    }
}

IJobHandler 有四個實現(xiàn)類:

  • MethodJobHandler:負責(zé)處理 BEAN 模式的任務(wù)

  • GlueJobHandler:負責(zé)處理 Glue 模式的任務(wù)

  • ScriptJobHandler:負責(zé)處理腳本模式的任務(wù)

  • CommandJobHandler:負責(zé)處理命令行任務(wù)

7.4 心跳檢測

任務(wù)注冊 Beat 周期默認30s

執(zhí)行器以一倍 Beat 進行執(zhí)行器注冊

調(diào)度中心以一倍 Beat 進行動態(tài)任務(wù)發(fā)現(xiàn)

執(zhí)行器注冊信息的失效時間為三倍 Beat

執(zhí)行器銷毀筐骇,主動上報調(diào)度中心并摘除對應(yīng)的執(zhí)行器機器信息

八 XXL-JOB 改造

8.1 自動裝配

8.1.1 XxlJobAutoConfigure

XxlJobAutoConfigure 自動裝配 XxlJobSpringExecutor 到項目的 Spring 容器中

/**
 * matchIfMissing 屬性為true時籽前,配置文件中缺少對應(yīng)的value或name的對應(yīng)的屬性值庆械,也會注入成功
 */
@Configuration
@EnableConfigurationProperties(XxlJobSpringExecutorProperties.class)
@ConditionalOnClass(XxlJobSpringExecutor.class)
@ConditionalOnProperty(prefix = "xxl-job", value = "enabled", matchIfMissing = true)
public class XxlJobAutoConfigure {

    private static Logger LOGGER = LoggerFactory.getLogger(XxlJobAutoConfigure.class);

    @Autowired
    private XxlJobSpringExecutorProperties properties;

    @Bean
    @ConditionalOnMissingBean(XxlJobSpringExecutor.class)
    public XxlJobSpringExecutor xxlJobExecutor() {
        LOGGER.info(">>>>>>>>>>> xxl-job config auto init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(properties.getAdmin().getAddresses());
        xxlJobSpringExecutor.setAppname(properties.getExecutor().getAppname());
        xxlJobSpringExecutor.setAddress(properties.getExecutor().getAddress());
        xxlJobSpringExecutor.setIp(properties.getExecutor().getIp());
        xxlJobSpringExecutor.setPort(properties.getExecutor().getPort());
        xxlJobSpringExecutor.setAccessToken(properties.getAccessToken());
        xxlJobSpringExecutor.setLogPath(properties.getExecutor().getLogPath());
        xxlJobSpringExecutor.setLogRetentionDays(properties.getExecutor().getLogRetentionDays());
        xxlJobSpringExecutor.setApplicationContext(SpringUtils.getApplicationContext());

        LOGGER.info(">>>>>>>>>>> XxlJobSpringExecutor: {}", xxlJobSpringExecutor.toString());
        LOGGER.info(">>>>>>>>>>> xxl-job config auto init end.");
        return xxlJobSpringExecutor;
    }
}

8.1.2 XxlJobSpringExecutorProperties

XxlJobSpringExecutor 的屬性全部放在 XxlJobSpringExecutorProperties 中

@ConfigurationProperties(prefix = "xxl-job")
public class XxlJobSpringExecutorProperties {

    private String accessToken;

    @NestedConfigurationProperty
    private Admin admin = new Admin();

    @NestedConfigurationProperty
    private Executor executor = new Executor();

    public String getAccessToken() {
        return accessToken;
    }

    public void setAccessToken(String accessToken) {
        this.accessToken = accessToken;
    }

    public Admin getAdmin() {
        return admin;
    }

    public void setAdmin(Admin admin) {
        this.admin = admin;
    }

    public Executor getExecutor() {
        return executor;
    }

    public void setExecutor(Executor executor) {
        this.executor = executor;
    }

    public static class Admin {
        private String addresses;

        public String getAddresses() {
            return addresses;
        }

        public void setAddresses(String addresses) {
            this.addresses = addresses;
        }
    }

    public static class Executor {
        private String appname;
        private String address;
        private String ip;
        private int port;
        private String logPath;
        private int logRetentionDays;

        public String getAppname() {
            return appname;
        }

        public void setAppname(String appname) {
            this.appname = appname;
        }

        public String getAddress() {
            return address;
        }

        public void setAddress(String address) {
            this.address = address;
        }

        public String getIp() {
            return ip;
        }

        public void setIp(String ip) {
            this.ip = ip;
        }

        public int getPort() {
            return port;
        }

        public void setPort(int port) {
            this.port = port;
        }

        public String getLogPath() {
            return logPath;
        }

        public void setLogPath(String logPath) {
            this.logPath = logPath;
        }

        public int getLogRetentionDays() {
            return logRetentionDays;
        }

        public void setLogRetentionDays(int logRetentionDays) {
            this.logRetentionDays = logRetentionDays;
        }
    }
}

8.1.3 配置 spring.factories

在項目的 resources\META-INF 目錄新建 spring.factories 文件,并配置自動配置類 XxlJobAutoConfigure

org.springframework.boot.autoconfigure.EnableAutoConfiguration = com.xxl.job.core.properties.auto.configure.XxlJobAutoConfigure

8.2 集成 spring-session

8.2.1 引入 spring-session 相關(guān)依賴

<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.springframework.session</groupId>
      <artifactId>spring-session-bom</artifactId>
      <version>Corn-SR2</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>

<!-- Spring Session core -->
<dependency>
    <groupId>org.springframework.session</groupId>
    <artifactId>spring-session-core</artifactId>
</dependency>

<!-- Spring Session Data Redis -->
<dependency>
    <groupId>org.springframework.session</groupId>
    <artifactId>spring-session-data-redis</artifactId>
</dependency>

<!-- Spring Boot Redis Data Starter -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <version>2.3.1.RELEASE</version>
</dependency>

8.2.2 添加相關(guān)配置

在 application.properties 文件中添加一下配置

# spring session
## equivalent to manually adding @EnableRedisHttpSession annotation
spring.session.store-type=redis
## flush-mode 有兩個參數(shù):ON_SAVE(表示在response commit前刷新緩存)棒呛,IMMEDIATE(表示只要有更新唬滑,就刷新緩存)
spring.session.redis.flush-mode=on_save
## 添加后告唆,redis中的key為spring:session:xxl-job
spring.session.redis.namespace=xxl-job

# redis config
## Redis server host.
## Redis 要開啟事件通知 redis-cli config set notify-keyspace-events Egx
spring.redis.host=192.168.99.100
## Login password of the redis server.
spring.redis.password=
## Redis server port.
spring.redis.port=16379

8.2.3 使用 HttpSession 進行邏輯處理

九 XXL-JOB 部署

環(huán)境 QA PP P
基礎(chǔ)設(shè)施 Linux 服務(wù)器(內(nèi)置 Java 環(huán)境)—— 1 臺,金橋 k8s Linux 服務(wù)器(內(nèi)置 Java 環(huán)境)—— 1 臺 金橋 k8s Linux 服務(wù)器(內(nèi)置 Java 環(huán)境)—— 2 臺 金橋 k8s
中間件 MySQL(主從)
Redis 集群(單例)
MySQL(主從)
Redis 集群(單例)
MySQL 主從
Redis 集群
其他 內(nèi)網(wǎng)域名 內(nèi)網(wǎng)域名 內(nèi)網(wǎng)域名

附錄

Cron 表達式

Cron表達式是一個字符串晶密,字符串以5或6個空格隔開擒悬,分為6或7個域,每一個域代表一個含義稻艰,Cron有如下兩種語法格式:

  • Seconds Minutes Hours DayofMonth Month DayofWeek Year
  • Seconds Minutes Hours DayofMonth Month DayofWeek

結(jié)構(gòu)

corn 從左到右(用空格隔開):秒 分 小時 月份中的日期 月份 星期中的日期 年份

字段含義

字段 允許值 允許的特殊字符
秒(Seconds) 0~59的整數(shù) , - * / 四個字符
分(Minutes 0~59的整數(shù) , - * / 四個字符
小時(Hours 0~23的整數(shù) , - * / 四個字符
日期(DayofMonth 1~31的整數(shù)(但是你需要考慮你月的天數(shù)) ,- * ? / L W C 八個字符
月份(Month 1~12的整數(shù)或者 JAN-DEC , - * / 四個字符
星期(DayofWeek 1~7的整數(shù)或者 SUN-SAT (1=SUN) , - * ? / L C # 八個字符
年(可選懂牧,留空)(Year 1970~2099 , - * / 四個字符

每一個域都使用數(shù)字,但還可以出現(xiàn)如下特殊字符尊勿,它們的含義是:

* :表示匹配該域的任意值僧凤。假如在Minutes域使用*, 即表示每分鐘都會觸發(fā)事件

? :只能用在DayofMonth和DayofWeek兩個域。它也匹配域的任意值元扔,但實際不會躯保。因為DayofMonth和DayofWeek會相互影響。例如想在每月的20日觸發(fā)調(diào)度澎语,不管20日到底是星期幾途事,則只能使用如下寫法: 13 13 15 20 * ?, 其中最后一位只能用?咏连,而不能使用*盯孙,如果使用*表示不管星期幾都會觸發(fā),實際上并不是這樣

- :表示范圍祟滴。例如在Minutes域使用5-20振惰,表示從5分到20分鐘每分鐘觸發(fā)一次 

/ :表示起始時間開始觸發(fā),然后每隔固定時間觸發(fā)一次垄懂。例如在Minutes域使用5/20,則意味著5分鐘觸發(fā)一次骑晶,而25,45等分別觸發(fā)一次

, :表示列出枚舉值草慧。例如:在Minutes域使用5,20桶蛔,則意味著在5和20分每分鐘觸發(fā)一次

L :表示最后,只能出現(xiàn)在DayofWeek和DayofMonth域漫谷。如果在DayofWeek域使用5L,意味著在最后的一個星期四觸發(fā)

W :表示有效工作日(周一到周五),只能出現(xiàn)在DayofMonth域仔雷,系統(tǒng)將在離指定日期的最近的有效工作日觸發(fā)事件。例如:在 DayofMonth使用5W,如果5日是星期六碟婆,則將在最近的工作日:星期五电抚,即4日觸發(fā)。如果5日是星期天竖共,則在6日(周一)觸發(fā)蝙叛;如果5日在星期一到星期五中的一天,則就在5日觸發(fā)公给。另外一點借帘,W的最近尋找不會跨過月份

LW :這兩個字符可以連用,表示在某個月最后一個工作日淌铐,即最后一個星期五

# :用于確定每個月第幾個星期幾肺然,只能出現(xiàn)在DayofMonth域。例如在4#2匣沼,表示某月的第二個星期三
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末狰挡,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子释涛,更是在濱河造成了極大的恐慌加叁,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,039評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件唇撬,死亡現(xiàn)場離奇詭異它匕,居然都是意外死亡,警方通過查閱死者的電腦和手機窖认,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,426評論 3 395
  • 文/潘曉璐 我一進店門豫柬,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人扑浸,你說我怎么就攤上這事烧给。” “怎么了喝噪?”我有些...
    開封第一講書人閱讀 165,417評論 0 356
  • 文/不壞的土叔 我叫張陵础嫡,是天一觀的道長。 經(jīng)常有香客問我酝惧,道長榴鼎,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,868評論 1 295
  • 正文 為了忘掉前任晚唇,我火速辦了婚禮巫财,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘哩陕。我一直安慰自己平项,他們只是感情好赫舒,可當(dāng)我...
    茶點故事閱讀 67,892評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著闽瓢,像睡著了一般号阿。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上鸳粉,一...
    開封第一講書人閱讀 51,692評論 1 305
  • 那天,我揣著相機與錄音园担,去河邊找鬼届谈。 笑死,一個胖子當(dāng)著我的面吹牛弯汰,可吹牛的內(nèi)容都是我干的艰山。 我是一名探鬼主播,決...
    沈念sama閱讀 40,416評論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼咏闪,長吁一口氣:“原來是場噩夢啊……” “哼曙搬!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起鸽嫂,我...
    開封第一講書人閱讀 39,326評論 0 276
  • 序言:老撾萬榮一對情侶失蹤纵装,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后据某,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體橡娄,經(jīng)...
    沈念sama閱讀 45,782評論 1 316
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,957評論 3 337
  • 正文 我和宋清朗相戀三年癣籽,在試婚紗的時候發(fā)現(xiàn)自己被綠了挽唉。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,102評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡筷狼,死狀恐怖瓶籽,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情埂材,我是刑警寧澤塑顺,帶...
    沈念sama閱讀 35,790評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站楞遏,受9級特大地震影響茬暇,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜寡喝,卻給世界環(huán)境...
    茶點故事閱讀 41,442評論 3 331
  • 文/蒙蒙 一糙俗、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧预鬓,春花似錦巧骚、人聲如沸赊颠。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,996評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽竣蹦。三九已至,卻和暖如春沧奴,著一層夾襖步出監(jiān)牢的瞬間痘括,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,113評論 1 272
  • 我被黑心中介騙來泰國打工滔吠, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留纲菌,地道東北人。 一個月前我還...
    沈念sama閱讀 48,332評論 3 373
  • 正文 我出身青樓疮绷,卻偏偏與公主長得像翰舌,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子冬骚,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,044評論 2 355