XXL-JOB是一個分布式任務(wù)調(diào)度平臺休玩,其核心設(shè)計目標(biāo)是開發(fā)迅速躏结、學(xué)習(xí)簡單、輕量級彰居、易擴展〕现剑現(xiàn)已開放源代碼并接入多家公司線上產(chǎn)品線,開箱即用
文檔及源碼
源碼倉庫地址
源碼倉庫地址 | Release Download |
---|---|
https://github.com/xuxueli/xxl-job | Download |
http://gitee.com/xuxueli0323/xxl-job | Download |
一陈惰、調(diào)度中心部署
1.1 初始化“調(diào)度數(shù)據(jù)庫”
數(shù)據(jù)庫中根據(jù)“調(diào)度數(shù)據(jù)庫初始化SQL腳本”創(chuàng)建調(diào)度數(shù)據(jù)庫
“調(diào)度數(shù)據(jù)庫初始化SQL腳本” 位置為:
調(diào)度中心支持集群部署畦徘,集群情況下各節(jié)點務(wù)必連接同一個mysql實例
如果mysql做主從,調(diào)度中心集群節(jié)點務(wù)必強制走主庫
1.2 部署 xxl-job-admin(調(diào)度中心)
統(tǒng)一管理任務(wù)調(diào)度平臺上調(diào)度任務(wù),負責(zé)觸發(fā)調(diào)度執(zhí)行抬闯,并且提供任務(wù)管理平臺
1.2.1 調(diào)度中心配置
### 調(diào)度中心JDBC鏈接
spring.datasource.url=jdbc:mysql://database_ip:port/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
spring.datasource.username=your_username
spring.datasource.password=your_password
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
### 報警郵箱
spring.mail.host=smtp.qq.com
spring.mail.port=25
spring.mail.username=xxx@qq.com
spring.mail.password=xxx
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true
spring.mail.properties.mail.smtp.socketFactory.class=javax.net.ssl.SSLSocketFactory
### 調(diào)度中心通訊TOKEN:非空時啟用井辆;
xxl.job.accessToken=
### 調(diào)度中心國際化配置:"zh_CN"/中文簡體, "zh_TC"/中文繁體 and "en"/英文;
xxl.job.i18n=zh_CN
## 調(diào)度線程池最大線程配置
xxl.job.triggerpool.fast.max=200
xxl.job.triggerpool.slow.max=100
### 調(diào)度中心日志表數(shù)據(jù)保存天數(shù):過期日志自動清理溶握;限制大于等于7時生效杯缺,否則, 如-1,關(guān)閉自動清理功能睡榆;
xxl.job.logretentiondays=30
1.2.2 調(diào)度中心部署
如果已經(jīng)正確進行上述配置萍肆,可將項目編譯打包部署,本例使用的測試服務(wù)器 ip 為:10.91.198.13胀屿,端口為 18080
調(diào)度中心訪問地址:http://10.91.198.13:18080/xxl-job-admin (該地址執(zhí)行器將會使用到塘揣,作為回調(diào)地址)
默認登錄賬號 “admin/123456”, 登錄后運行界面如下圖所示
1.3 調(diào)度中心集群
調(diào)度中心集群部署時,有以下注意點:
- DB配置保持一致
- 集群機器時鐘保持一致(單機集群忽視)
- 建議:推薦通過nginx為調(diào)度中心集群做負載均衡宿崭,分配域名亲铡。調(diào)度中心訪問、執(zhí)行器回調(diào)配置、調(diào)用API服務(wù)等操作均通過該域名進行
二奖蔓、現(xiàn)有項目接入
2.1 引入POM
<!-- xxl-job-core -->
<dependency>
<groupId>com.saicmotor.com</groupId>
<artifactId>sp-xxl-job-core</artifactId>
<version>1.0.0</version>
</dependency>
2.2 添加 xxl-job 配置項
# 項目端口
server.port=8081
# no web
#spring.main.web-environment=false
# 日志配置
logging.config=classpath:logback.xml
### 調(diào)度中心部署跟地址 [選填]:如調(diào)度中心集群部署存在多個地址則用逗號分隔赞草。執(zhí)行器將會使用該地址進行"執(zhí)行器心跳注冊"和"任務(wù)結(jié)果回調(diào)";為空則關(guān)閉自動注冊
xxl.job.admin.addresses=http://10.91.198.13:18080/xxl-job-admin
### 執(zhí)行器通訊TOKEN [選填]:非空時啟用
xxl.job.accessToken=
### 執(zhí)行器AppName [選填]:執(zhí)行器心跳注冊分組依據(jù)吆鹤;為空則關(guān)閉自動注冊
xxl.job.executor.appname=executor-test-lzn
### 執(zhí)行器注冊 [選填]:優(yōu)先使用該配置作為注冊地址厨疙,為空時使用內(nèi)嵌服務(wù) ”IP:PORT“ 作為注冊地址。從而更靈活的支持容器類型執(zhí)行器動態(tài)IP和動態(tài)映射端口問題
xxl.job.executor.address=http://項目ip:執(zhí)行器端口號
### 執(zhí)行器IP [選填]:默認為空表示自動獲取IP檀头,多網(wǎng)卡時可手動設(shè)置指定IP轰异,該IP不會綁定Host僅作為通訊實用;地址信息用于 "執(zhí)行器注冊" 和 "調(diào)度中心請求并觸發(fā)任務(wù)"
xxl.job.executor.ip=項目ip
### 執(zhí)行器端口號 [選填]:小于等于0則自動獲仁钍肌;默認端口為9999婴削,單機部署多個執(zhí)行器時廊镜,注意要配置不同執(zhí)行器端口
xxl.job.executor.port=9999
### 執(zhí)行器運行日志文件存儲磁盤路徑 [選填] :需要對該路徑擁有讀寫權(quán)限;為空則使用默認路徑
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
### 執(zhí)行器日志文件保存天數(shù) [選填] : 過期日志自動清理, 限制值大于等于3時生效; 否則, 如-1, 關(guān)閉自動清理功能
xxl.job.executor.logretentiondays=30
2.3 添加執(zhí)行器組件配置(現(xiàn)已自動裝配)
@Configuration
public class XxlJobConfig {
private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;
@Value("${xxl.job.accessToken}")
private String accessToken;
@Value("${xxl.job.executor.appname}")
private String appname;
@Value("${xxl.job.executor.address}")
private String address;
@Value("${xxl.job.executor.ip}")
private String ip;
@Value("${xxl.job.executor.port}")
private int port;
@Value("${xxl.job.executor.logpath}")
private String logPath;
@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
}
2.4 自定義定時任務(wù)
開發(fā)步驟:
- 在 Spring Bean 實例中唉俗,開發(fā) Job 方法嗤朴,方式格式要求為 "public ReturnT<String> execute(String param)"
- 為 Job 方法添加注解 "@XxlJob(value="自定義 jobhandler 名稱", init = "JobHandler 初始化方法", destroy = "JobHandler 銷毀方法")",注解value值對應(yīng)的是調(diào)度中心新建任務(wù)的 JobHandler 屬性的值
/**
* XxlJob開發(fā)示例
*/
@Component
public class SampleXxlJobPools {
private static Logger logger = LoggerFactory.getLogger(SampleXxlJob.class);
@XxlJob(value = "testJobHandler", init = "init", destroy = "destroy")
public ReturnT<String> demoJobHandler(String param) throws Exception {
XxlJobLogger.log("XXL-JOB, Hello World, Hello zll");
return ReturnT.SUCCESS;
}
public void init() {
logger.info("testJobHandler init");
}
public void destroy() {
logger.info("testJobHandler destory");
}
}
2.5 配置執(zhí)行器
進入調(diào)度中心:http://10.91.198.13:18080/xxl-job-admin
進入執(zhí)行器管理虫溜,點擊操作 -> 編輯雹姊,進入執(zhí)行器編輯頁面
2.6 配置及執(zhí)行任務(wù)
進入調(diào)度中心:http://10.91.198.13:18080/xxl-job-admin
進入任務(wù)管理,點擊新增衡楞,進入新增任務(wù)頁面
任務(wù)配置參數(shù)釋義如下:
參數(shù) | 說明 |
---|---|
執(zhí)行器 | 任務(wù)的綁定的執(zhí)行器吱雏,任務(wù)觸發(fā)調(diào)度時將會自動發(fā)現(xiàn)注冊成功的執(zhí)行器, 實現(xiàn)任務(wù)自動發(fā)現(xiàn)功能; 另一方面也可以方便的進行任務(wù)分組。每個任務(wù)必須綁定一個執(zhí)行器, 可在 "執(zhí)行器管理" 進行設(shè)置 |
任務(wù)描述 | 任務(wù)的描述信息瘾境,便于任務(wù)管理 |
路由策略 | FIRST(第一個):固定選擇第一個機器歧杏; LAST(最后一個):固定選擇最后一個機器; ROUND(輪詢):輪詢迷守; RANDOM(隨機):隨機選擇在線的機器犬绒; CONSISTENT_HASH(一致性HASH):每個任務(wù)按照Hash算法固定選擇某一臺機器,且所有任務(wù)均勻散列在不同機器上兑凿; LEAST_FREQUENTLY_USED(最不經(jīng)常使用):使用頻率最低的機器優(yōu)先被選舉凯力; LEAST_RECENTLY_USED(最近最久未使用):最久未使用的機器優(yōu)先被選舉; FAILOVER(故障轉(zhuǎn)移):按照順序依次進行心跳檢測礼华,第一個心跳檢測成功的機器選定為目標(biāo)執(zhí)行器并發(fā)起調(diào)度咐鹤; BUSYOVER(忙碌轉(zhuǎn)移):按照順序依次進行空閑檢測,第一個空閑檢測成功的機器選定為目標(biāo)執(zhí)行器并發(fā)起調(diào)度卓嫂; SHARDING_BROADCAST(分片廣播):廣播觸發(fā)對應(yīng)集群中所有機器執(zhí)行一次任務(wù)慷暂,同時系統(tǒng)自動傳遞分片參數(shù);可根據(jù)分片參數(shù)開發(fā)分片任務(wù); |
Cron | 觸發(fā)任務(wù)執(zhí)行的Cron表達式 |
運行模式 | BEAN模式:任務(wù)以JobHandler方式維護在執(zhí)行器端行瑞;需要結(jié)合 "JobHandler" 屬性匹配執(zhí)行器中任務(wù)奸腺; GLUE模式(Java):任務(wù)以源碼方式維護在調(diào)度中心;該模式的任務(wù)實際上是一段繼承自IJobHandler的Java類代碼并 "groovy" 源碼方式維護血久,它在執(zhí)行器項目中運行突照,可使用@Resource/@Autowire注入執(zhí)行器里中的其他服務(wù); GLUE模式(Shell):任務(wù)以源碼方式維護在調(diào)度中心氧吐;該模式的任務(wù)實際上是一段 "shell" 腳本讹蘑; GLUE模式(Python):任務(wù)以源碼方式維護在調(diào)度中心;該模式的任務(wù)實際上是一段 "python" 腳本筑舅; GLUE模式(PHP):任務(wù)以源碼方式維護在調(diào)度中心座慰;該模式的任務(wù)實際上是一段 "php" 腳本; GLUE模式(NodeJS):任務(wù)以源碼方式維護在調(diào)度中心翠拣;該模式的任務(wù)實際上是一段 "nodejs" 腳本版仔; GLUE模式(PowerShell):任務(wù)以源碼方式維護在調(diào)度中心;該模式的任務(wù)實際上是一段 "PowerShell" 腳本误墓; |
JobHandler | 行模式為 "BEAN模式" 時生效蛮粮,對應(yīng)執(zhí)行器中新開發(fā)的 JobHandler 類 “@JobHandler” 注解自定義的 value 值 |
阻塞處理策略 | 調(diào)度過于密集執(zhí)行器來不及處理時的處理策略; 單機串行(默認):調(diào)度請求進入單機執(zhí)行器后谜慌,調(diào)度請求進入FIFO隊列并以串行方式運行然想; 丟棄后續(xù)調(diào)度:調(diào)度請求進入單機執(zhí)行器后,發(fā)現(xiàn)執(zhí)行器存在運行的調(diào)度任務(wù)欣范,本次請求將會被丟棄并標(biāo)記為失敱湫埂; 覆蓋之前調(diào)度:調(diào)度請求進入單機執(zhí)行器后熙卡,發(fā)現(xiàn)執(zhí)行器存在運行的調(diào)度任務(wù)杖刷,將會終止運行中的調(diào)度任務(wù)并清空隊列,然后運行本地調(diào)度任務(wù)驳癌; |
子任務(wù) | 每個任務(wù)都擁有一個唯一的任務(wù)ID(任務(wù)ID可以從任務(wù)列表獲取)滑燃,當(dāng)本任務(wù)執(zhí)行結(jié)束并且執(zhí)行成功時,將會觸發(fā)子任務(wù)ID所對應(yīng)的任務(wù)的一次主動調(diào)度 |
任務(wù)超時時間 | 支持自定義任務(wù)超時時間颓鲜,任務(wù)運行超時將會主動中斷任務(wù) |
失敗重試次數(shù) | 支持自定義任務(wù)失敗重試次數(shù)表窘,當(dāng)任務(wù)失敗時將會按照預(yù)設(shè)的失敗重試次數(shù)主動進行重試 |
報警郵件 | 任務(wù)調(diào)度失敗時郵件通知的郵箱地址,支持配置多郵箱地址甜滨,配置多個郵箱地址時用逗號分隔 |
負責(zé)人 | 任務(wù)的負責(zé)人 |
任務(wù)參數(shù) | 任務(wù)執(zhí)行所需的參數(shù) |
配置任務(wù)后乐严,右擊選擇執(zhí)行一次進行任務(wù)調(diào)度測試
任務(wù)執(zhí)行后進入調(diào)度日志,可以查看任務(wù)執(zhí)行日志
至此項目的任務(wù)配置完成
2.7 執(zhí)行器集群
執(zhí)行器集群部署時衣摩,幾點要求和建議:
- 執(zhí)行器回調(diào)地址(xxl.job.admin.addresses)需要保持一致昂验;執(zhí)行器根據(jù)該配置進行執(zhí)行器自動注冊等操作
- 同一個執(zhí)行器集群內(nèi)AppName(xxl.job.executor.appname)需要保持一致;調(diào)度中心根據(jù)該配置動態(tài)發(fā)現(xiàn)不同集群的在線執(zhí)行器列表
三、用戶管理
進入 “用戶管理” 界面既琴,可查看和管理用戶信息占婉;
目前用戶分為兩種角色:
- 管理員:擁有全量權(quán)限,支持在線管理用戶信息甫恩,為用戶分配權(quán)限逆济,權(quán)限分配粒度為執(zhí)行器;
- 普通用戶:僅擁有被分配權(quán)限的執(zhí)行器磺箕,及相關(guān)任務(wù)的操作權(quán)限奖慌;
四、任務(wù)詳解
4.1 BEAN模式(類形式)
Bean模式任務(wù)松靡,支持基于方法的開發(fā)方式简僧,每個任務(wù)對應(yīng)一個方法。
- 優(yōu)點:
- 每個任務(wù)只需要開發(fā)一個方法击困,并添加”@XxlJob”注解即可涎劈,更加方便、快速阅茶。
- 支持自動掃描任務(wù)并注入到執(zhí)行器容器。
- 缺點:要求Spring容器環(huán)境谅海;
4.1.1 開發(fā)方法詳解
- 在Spring Bean實例中脸哀,開發(fā)Job方法,方式格式要求為 "public ReturnT<String> execute(String param)"
- 為Job方法添加注解 "@XxlJob(value="自定義jobhandler名稱", init = "JobHandler初始化方法", destroy = "JobHandler銷毀方法")"扭吁,注解value值對應(yīng)的是調(diào)度中心新建任務(wù)的JobHandler屬性的值
- 執(zhí)行日志:需要通過 "XxlJobLogger.log" 打印執(zhí)行日志
4.1.2 內(nèi)置 Bean 模式任務(wù)
為方便用戶參考與快速實用撞蜂,示例執(zhí)行器內(nèi)原生提供多個Bean模式任務(wù)Handler,可以直接配置實用侥袜,如下:
shardingJobHandler:分片示例任務(wù)蝌诡,任務(wù)內(nèi)部模擬處理分片參數(shù),可參考熟悉分片任務(wù)
-
httpJobHandler:通用HTTP任務(wù)Handler枫吧;業(yè)務(wù)方只需要提供HTTP鏈接等信息即可浦旱,不限制語言傀蚌、平臺厘熟。示例任務(wù)入?yún)⑷缦拢?/p>
url: http://www.xxx.com method: get 或 post data: post-data
commandJobHandler:通用命令行任務(wù)Handler;業(yè)務(wù)方只需要提供命令行即可搁胆;如 “pwd”命令
4.2 GLUE模式(Java)
任務(wù)以源碼方式維護在調(diào)度中心例隆,支持通過 Web IDE 在線更新甥捺,實時編譯和生效,因此不需要指定 JobHandler
4.2.1 基本開發(fā)流程
調(diào)度中心镀层,新建調(diào)度任務(wù)
參考新建的任務(wù)進行參數(shù)配置镰禾,運行模式選中 “GLUE模式(Java)”
開發(fā)任務(wù)代碼
選中指定任務(wù),點擊該任務(wù)右側(cè)“GLUE”按鈕,將會前往GLUE任務(wù)的Web IDE界面吴侦,在該界面支持對任務(wù)代碼進行開發(fā)(也可以在IDE中開發(fā)完成后屋休,復(fù)制粘貼到編輯中)。
版本回溯功能(支持30個版本的版本回溯):在GLUE任務(wù)的Web IDE界面妈倔,選擇右上角下拉框“版本回溯”博投,會列出該GLUE的更新歷史,選擇相應(yīng)版本即可顯示該版本代碼盯蝴,保存后GLUE代碼即回退到對應(yīng)的歷史版本毅哗;
執(zhí)行任務(wù)
在任務(wù)管理器中執(zhí)行任務(wù)即可
4.2.2 注入執(zhí)行器中的 Bean 實例
在線編輯的代碼可以通過 @Autowired
or @Resource
注解使用執(zhí)行器中注冊在 spring 容器中的 bean 實例,具體配置如下:
添加 SpringUtils 獲取 IOC 容器
/**
* 獲取 Spring IOC 容器
*
*/
public class SpringUtils implements ApplicationContextAware {
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
SpringUtils.applicationContext = applicationContext;
}
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
}
添加測試服務(wù)類
@Service
public class TestService {
@Value("lzn")
private String name;
public TestService() {
}
public TestService(String name) {
this.name = name;
}
public String testHello() {
return "testHello: " + name;
}
}
修改 XxlJobConfig 配置類
為 XxlJobSpringExecutor 設(shè)置 ApplicationContext
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
xxlJobSpringExecutor.setApplicationContext(SpringUtils.getApplicationContext());
return xxlJobSpringExecutor;
}
任務(wù)代碼修改如下
任務(wù)執(zhí)行后捧挺,執(zhí)行日志如下虑绵,可以看到 TestService#testHello() 方法成功注入并執(zhí)行
4.3 GLUE模式(Shell)
待補充
五、XXL-JOB VS. Kangaroo ETL
5.1 ETL
ETL(Extraction-Transformation-Loading)用來描述將數(shù)據(jù)從來源端經(jīng)過抽让隼印(extract)翅睛、轉(zhuǎn)換(transform)、加載(load)至目的端的過程黑竞。ETL負責(zé)將分布的捕发、異構(gòu)數(shù)據(jù)源中的數(shù)據(jù)如關(guān)系數(shù)據(jù)、平面數(shù)據(jù)文件等抽取到臨時中間層后進行清洗很魂、轉(zhuǎn)換扎酷、集成,最后加載到數(shù)據(jù)倉庫或數(shù)據(jù)集市中遏匆,成為聯(lián)機分析處理法挨、數(shù)據(jù)挖掘的基礎(chǔ)。
ETL 是數(shù)據(jù)倉庫獲取高質(zhì)量數(shù)據(jù)的關(guān)鍵環(huán)節(jié)幅聘,也是BI(商業(yè)智能)項目最重要的一個環(huán)節(jié)凡纳,可以說,ETL設(shè)計的好壞直接關(guān)系到BI項目的成敗帝蒿。
ETL是一個長期的過程荐糜,通過對分散在各業(yè)務(wù)系統(tǒng)中相互關(guān)聯(lián)的分布式異構(gòu)數(shù)據(jù)進行提取(Extract)陵叽、清洗狞尔、轉(zhuǎn)換(Transform)和加載(Load),使這些數(shù)據(jù)成為BI系統(tǒng)需要的有用數(shù)據(jù)巩掺。
5.2 ETL 流程
5.2.1. 數(shù)據(jù)提绕颉(Data Extract)
數(shù)據(jù)提取指的是從不同的網(wǎng)絡(luò)、操作平臺胖替、應(yīng)用研儒、數(shù)據(jù)庫和數(shù)據(jù)格式中抽取數(shù)據(jù)的過程豫缨。
在這個過程中,首先需要根據(jù)實際業(yè)務(wù)需求進行抽取字段的確定端朵,形成一張公共需求的清單好芭,同時清單中的需求字段也應(yīng)當(dāng)和數(shù)據(jù)庫中的字段形成一一映射關(guān)系,這樣通過數(shù)據(jù)抽取得到的數(shù)據(jù)都能夠整齊劃一冲呢,為后續(xù)數(shù)據(jù)轉(zhuǎn)換和加載提供基礎(chǔ)舍败。
具體實現(xiàn)過程中,將會涉及到如何解決從不同類型的數(shù)據(jù)庫(如Oracle敬拓、MySQL邻薯、DB2)、不同類型的文件系統(tǒng)(如Linux乘凸、Windows)以何種方式(如數(shù)據(jù)庫抽取厕诡、文件傳輸、流式)营勤、何種頻率(分鐘灵嫌、小時、天葛作、周寿羞、月等)獲取數(shù)據(jù)的問題。
5.2.2 數(shù)據(jù)轉(zhuǎn)換(Data Transformation)
數(shù)據(jù)轉(zhuǎn)換是指將數(shù)據(jù)從一種表現(xiàn)形式變?yōu)榱硪环N表現(xiàn)形式的過程赂蠢,即對數(shù)據(jù)進行整合稠曼、拆分和變換。
- 數(shù)據(jù)整合是指通過多表關(guān)聯(lián)客年,將不同類型數(shù)據(jù)之間可能存在潛在關(guān)聯(lián)關(guān)系的多條數(shù)據(jù)進行合并,通過數(shù)據(jù)的整合漠吻,豐富數(shù)據(jù)維度量瓜,有利于發(fā)現(xiàn)更多有價值的信息。
- 數(shù)據(jù)拆分是指按一定規(guī)則對數(shù)據(jù)進行拆分途乃,將一條數(shù)據(jù)拆分為多條绍傲。
- 數(shù)據(jù)變換是指對數(shù)據(jù)進行行列轉(zhuǎn)換、排序耍共、修改序號烫饼、去除重復(fù)記錄變換操作。
數(shù)據(jù)轉(zhuǎn)換一般包括兩類工作:
一種是對數(shù)據(jù)名稱及格式進行統(tǒng)一试读,另一種則是數(shù)據(jù)倉庫中可能會有源數(shù)據(jù)庫中不存在的數(shù)據(jù)杠纵,因此需要進行字段的組合、分割或計算钩骇。
數(shù)據(jù)轉(zhuǎn)換其實還包含了數(shù)據(jù)清洗的工作比藻,根據(jù)業(yè)務(wù)規(guī)則對異常數(shù)據(jù)進行清洗铝量,將不完整、錯誤或重復(fù)的數(shù)據(jù)進行相應(yīng)處理银亲,以保證后續(xù)分析結(jié)果的準確性慢叨。
5.2.3 數(shù)據(jù)加載(Data Loading)
數(shù)據(jù)加載的主要任務(wù)是將清洗過后的干凈的數(shù)據(jù)集按物理數(shù)據(jù)模型定義的表結(jié)構(gòu)裝入目標(biāo)數(shù)據(jù)倉庫的數(shù)據(jù)表中,并允許人工干預(yù)务蝠,以及提供強大的錯誤報告拍谐、系統(tǒng)日志、數(shù)據(jù)備份與恢復(fù)功能馏段。
數(shù)據(jù)加載的方式主要有:增量加載(時間戳方式轩拨、日志表方式、全表比對方式)毅弧、全量加載(全表刪除再插入方式)气嫁。整個操作過程中往往要跨網(wǎng)絡(luò)、跨操作平臺够坐。實際工作中寸宵,數(shù)據(jù)加載需要結(jié)合使用的數(shù)據(jù)庫系統(tǒng),確定最優(yōu)的數(shù)據(jù)加載方案元咙,節(jié)約CPU梯影、硬盤IO和網(wǎng)絡(luò)傳輸資源。
5.3 指標(biāo)對比
XXL-JOB | Kangaroo ETL | |
---|---|---|
定位 | 面向系統(tǒng)定時任務(wù)的任務(wù)調(diào)度平臺 | 面向數(shù)據(jù)服務(wù)的任務(wù)調(diào)度平臺 |
任務(wù)類型 | 1 Bean模式:每個Bean模式任務(wù)都是一個Spring的Bean類實例庶香,它被維護在“執(zhí)行器”項目的Spring容器中 2 GLUE模式(Java):通過Groovy類加載器加載此代碼甲棍,實例化成Java對象,執(zhí)行任務(wù)邏輯 3 GLUE模式(Shell) + GLUE模式(Python) + GLUE模式(PHP) + GLUE模式(NodeJS) + GLUE模式(Powershell):腳本任務(wù)的源碼托管在調(diào)度中心赶掖,腳本邏輯在執(zhí)行器運行感猛。通過Java代碼調(diào)用腳本,在調(diào)度中心可以實時監(jiān)控腳本運行情況 |
1 DX 任務(wù):支持 Oracle奢赂、MySQL陪白、SQL Server、Text 文件膳灶、HDFS(RCFile,TextFile,Orc 格式)5 種數(shù)據(jù)源之間的數(shù)據(jù)交換 2 Script 任務(wù):支持 shell 腳本咱士,通常用于執(zhí)行 HiveSQL、SparkSQL轧钓、Jar 等 |
系統(tǒng)侵入性 | 有侵入 | 無侵入 |
系統(tǒng)兼容性 | 兼容 Spring序厉、Dubbo等主流框架。針對非Java應(yīng)用毕箍,可借助 XXL-JOB 的標(biāo)準 RESTful API 方便的實現(xiàn)多語言支持 | - |
適用場景 | 將系統(tǒng)的定時任務(wù)和調(diào)度解耦弛房,任務(wù)調(diào)度交給統(tǒng)一平臺進行管理,提高系統(tǒng)可用性和穩(wěn)定性 | 將分布的霉晕、異構(gòu)的數(shù)據(jù)源中的數(shù)據(jù)通過合理的任務(wù)調(diào)度進行抽取庭再、轉(zhuǎn)換和加載捞奕,為后續(xù)的數(shù)據(jù)分析、數(shù)據(jù)挖掘做基礎(chǔ) |
六拄轻、總體設(shè)計
6.1 架構(gòu)設(shè)計
6.1.1 系統(tǒng)組成
-
調(diào)度模塊(調(diào)度中心)
負責(zé)管理調(diào)度信息颅围,按照調(diào)度配置發(fā)出調(diào)度請求,自身不承擔(dān)業(yè)務(wù)代碼恨搓。調(diào)度系統(tǒng)與任務(wù)解耦院促,提高了系統(tǒng)可用性和穩(wěn)定性,同時調(diào)度系統(tǒng)性能不再受限于任務(wù)模塊斧抱;
支持可視化常拓、簡單且動態(tài)的管理調(diào)度信息,包括任務(wù)新建辉浦,更新弄抬,刪除,GLUE開發(fā)和任務(wù)報警等宪郊,所有上述操作都會實時生效掂恕,同時支持監(jiān)控調(diào)度結(jié)果以及執(zhí)行日志,支持執(zhí)行器Failover弛槐; -
執(zhí)行模塊(執(zhí)行器)
負責(zé)接收調(diào)度請求并執(zhí)行任務(wù)邏輯懊亡。任務(wù)模塊專注于任務(wù)的執(zhí)行等操作,開發(fā)和維護更加簡單和高效乎串;
接收“調(diào)度中心”的執(zhí)行請求店枣、終止請求和日志請求等;
6.1.2 架構(gòu)圖
6.2 任務(wù)注冊和自動發(fā)現(xiàn)
- AppName: 每個執(zhí)行器機器集群的唯一標(biāo)示, 任務(wù)注冊以 "執(zhí)行器" 為最小粒度進行注冊; 每個任務(wù)通過其綁定的執(zhí)行器可感知對應(yīng)的執(zhí)行器機器列表
- 注冊表: 見"xxl_job_registry"表, "執(zhí)行器" 在進行任務(wù)注冊時將會周期性維護一條注冊記錄叹誉,即機器地址和AppName的綁定關(guān)系; "調(diào)度中心" 從而可以動態(tài)感知每個AppName在線的機器列表
- 執(zhí)行器注冊: 任務(wù)注冊Beat周期默認30s; 執(zhí)行器以一倍Beat進行執(zhí)行器注冊, 調(diào)度中心以一倍Beat進行動態(tài)任務(wù)發(fā)現(xiàn); 注冊信息的失效時間為三倍Beat
- 執(zhí)行器注冊摘除:執(zhí)行器銷毀時鸯两,將會主動上報調(diào)度中心并摘除對應(yīng)的執(zhí)行器機器信息,提高心跳注冊的實時性
七 源碼解析
7.1 XxlJobSpringExecutor 啟動
項目啟動后獲取配置文件中的屬性值并實例化配置類 XxlJobSpringExecutor长豁,XxlJobSpringExecutor 實現(xiàn)了 SmartInitializingSingleton 接口的 afterSingletonsInstantiated 方法甩卓,在實例化 XxlJobSpringExecutor 后觸發(fā)執(zhí)行,XxlJobSpringExecutor 在該方法里做了兩件事:
調(diào)用 initJobHandlerMethodRepository 方法掃描項目中帶 @XxlJob 注解的方法(即 jobHandler)并注冊;
-
調(diào)用父類 XxlJobExecutor 的 start 方法啟動 Executor缀棍,并初始化核心組件:
初始化 admin 控制臺:initAdminBizList(adminAddresses, accessToken)
初始化日志清理進程 JobLogFileCleanThread:JobLogFileCleanThread.getInstance().start(logRetentionDays)
初始化觸發(fā)回調(diào)進程 TriggerCallbackThread:TriggerCallbackThread.getInstance().start()
初始化內(nèi)置服務(wù) executor-server:initEmbedServer(address, ip, port, appname, accessToken)
7.1.1 jobHandler 注冊
在項目啟動時宅此,執(zhí)行器會通過 “@JobHandler” 識別 Spring 容器中 “Bean模式任務(wù)”,以注解的 value 屬性為 key 管理起來爬范,保存至 jobHandlerRepository父腕。
1 initJobHandlerMethodRepository(ApplicationContext applicationContext)
該方法掃描項目中的類,將帶有 @XxlJob 注解的 job 進行解析青瀑,并調(diào)用 registJobHandler(String name, IJobHandler jobHandler) 進行注冊:
// ---------------------- job handler repository ----------------------
private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
// registry jobhandler
/**
* name: @XxlJob 的 value, jobHandler 的名字
* bean:帶 @XxlJob 方法對應(yīng)的類名
* method: 帶 @XxlJob 注解的方法名
* initMethod: @XxlJob 的 init 值對應(yīng)方法的全限定名璧亮,即 execute 方法調(diào)用前執(zhí)行的方法
* destroyMethod: @XxlJob 的 destroy 值對應(yīng)方法的全限定名萧诫,即 execute 方法調(diào)用后執(zhí)行的方法
*/
registJobHandler(name, new MethodJobHandler(bean, method, initMethod, destroyMethod));
2 MethodJobHandler
public class MethodJobHandler extends IJobHandler {
private final Object target;
private final Method method;
private Method initMethod;
private Method destroyMethod;
public MethodJobHandler(Object target, Method method, Method initMethod, Method destroyMethod) {
this.target = target;
this.method = method;
this.initMethod =initMethod;
this.destroyMethod =destroyMethod;
}
@Override
public ReturnT<String> execute(String param) throws Exception {
return (ReturnT<String>) method.invoke(target, new Object[]{param});
}
@Override
public void init() throws InvocationTargetException, IllegalAccessException {
if(initMethod != null) {
initMethod.invoke(target);
}
}
@Override
public void destroy() throws InvocationTargetException, IllegalAccessException {
if(destroyMethod != null) {
destroyMethod.invoke(target);
}
}
}
7.1.2 XxlJobSpringExecutor 初始化
7.1.2.1 初始化 admin 控制臺
initAdminBizList 實例化 AdminBizClient,并將其存在 adminBizList 中
private static List<AdminBiz> adminBizList;
private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
if (adminAddresses!=null && adminAddresses.trim().length()>0) {
// 多個 admin 控制臺地址以 "," 分隔
for (String address: adminAddresses.trim().split(",")) {
if (address!=null && address.trim().length()>0) {
AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);
if (adminBizList == null) {
adminBizList = new ArrayList<AdminBiz>();
}
adminBizList.add(adminBiz);
}
}
}
}
7.1.2.2 初始化日志清理進程
7.1.2.3 初始化觸發(fā)回調(diào)進程
7.1.2.4 初始化內(nèi)置服務(wù)
private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {
// start
embedServer = new EmbedServer();
embedServer.start(address, port, appname, accessToken);
}
initEmbedServer 方法內(nèi)部直接調(diào)用 EmbedServer#start 方法
public void start(final String address, final int port, final String appname, final String accessToken) {
// 1 初始化 executorBiz 執(zhí)行單元和線程池
executorBiz = new ExecutorBizImpl();
ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
0,
200,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-rpc, EmbedServer bizThreadPool-" + r.hashCode());
}
},
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
}
});
// 2 啟動 ServerBootstrap 并綁定端口號
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL
.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture future = bootstrap.bind(port).sync();
// 3 將執(zhí)行器信息注冊到調(diào)度中心(admin 控制臺)
startRegistry(appname, address);
// 4 wait util stop
future.channel().closeFuture().sync();
}
EmbedServer 的 start 方法主要處理以下幾件事:
初始化 executorBiz 和線程池 bizThreadPool
-
啟動 ServerBootstrap 并綁定端口號
調(diào)度中心實際的調(diào)度請求由 EmbedHttpServerHandler 處理
-
將執(zhí)行器信息注冊到調(diào)度中心(admin 控制臺)
public void startRegistry(final String appname, final String address) { // start registry ExecutorRegistryThread.getInstance().start(appname, address); }
ExecutorRegistryThread#start 方法封裝執(zhí)行器信息枝嘶,并調(diào)用 AdminBizClient#registry 方法將執(zhí)行器信息注冊于調(diào)度中心帘饶,核心代碼如下:
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address); for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) { try { // AdminBizClient 實現(xiàn) AdminBiz 接口,將執(zhí)行器信息進行注冊 ReturnT<String> registryResult = adminBiz.registry(registryParam); if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) { registryResult = ReturnT.SUCCESS; logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult}); break; } else { logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult}); } } catch (Exception e) { logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e); } }
7.2 調(diào)度平臺發(fā)布任務(wù)
前端頁面處罰具體任務(wù)后群扶,轉(zhuǎn)發(fā)至 JobInfoController 的 triggerJob 方法
/**
* 處理觸發(fā)請求
*
* @param id 任務(wù) id
* @param executorParam 執(zhí)行器參數(shù)
* @param addressList 執(zhí)行器地址
* @return
*/
@RequestMapping("/trigger")
@ResponseBody
public ReturnT<String> triggerJob(int id, String executorParam, String addressList) {
JobTriggerPoolHelper.trigger(id, TriggerTypeEnum.MANUAL, -1, null, executorParam, addressList);
return ReturnT.SUCCESS;
}
JobTriggerPoolHelper 的 trigger 方法調(diào)用 addTrigger 方法及刻,addTrigger 方法將調(diào)度請求交給線程池處理,線程池中通過 XxlJobTrigger 的 trigger 方法處理實際請求
public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) {
// 獲取任務(wù)詳細信息
XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
if (jobInfo == null) {
logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid竞阐,jobId={}", jobId);
return;
}
// 設(shè)置任務(wù)執(zhí)行參數(shù)
if (executorParam != null) {
jobInfo.setExecutorParam(executorParam);
}
int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();
// 獲取執(zhí)行器詳細信息
XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());
// 如果手動設(shè)置執(zhí)行任務(wù)機器的地址缴饭,則覆蓋執(zhí)行器原有的地址(即新增執(zhí)行器時自動注冊或手動錄入的地址)
if (addressList!=null && addressList.trim().length()>0) {
group.setAddressType(1);
group.setAddressList(addressList.trim());
}
// sharding param
int[] shardingParam = null;
if (executorShardingParam!=null){
String[] shardingArr = executorShardingParam.split("/");
if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {
shardingParam = new int[2];
shardingParam[0] = Integer.valueOf(shardingArr[0]);
shardingParam[1] = Integer.valueOf(shardingArr[1]);
}
}
// 如果是分片廣播任務(wù),則根據(jù)執(zhí)行器數(shù)量將任務(wù)發(fā)布至各個機器地址
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
&& group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
&& shardingParam==null) {
for (int i = 0; i < group.getRegistryList().size(); i++) {
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
}
} else { // 其他路由策略的任務(wù)發(fā)布一條即可
if (shardingParam == null) {
shardingParam = new int[]{0, 1};
}
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
}
}
processTrigger 方法主要處理以下幾件事:
初始化觸發(fā)參數(shù):TriggerParam
-
根據(jù)不同路由策略獲取執(zhí)行器地址
根據(jù)不同的 executorRouteStrategy 策略獲取 ExecutorRouter骆莹,并調(diào)用 ExecutorRouter 的 route 方法選擇一個執(zhí)行器地址
// 根據(jù)不同路由策略獲取執(zhí)行器 ip 地址 routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
-
發(fā)送調(diào)度請求至遠程執(zhí)行器
runExecutor 方法根據(jù)執(zhí)行器地址獲取執(zhí)行單元 ExecutorBizClient颗搂,并調(diào)用 ExecutorBizClient 的 run 方法將調(diào)度請求以 HTTP POST 形式發(fā)送,由執(zhí)行器的 EmbedServer 接受
public ReturnT<String> run(TriggerParam triggerParam) { return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class); }
保存任務(wù)執(zhí)行信息
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){
// param
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy
ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy
String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;
// 1幕垦、save log-id
XxlJobLog jobLog = new XxlJobLog();
jobLog.setJobGroup(jobInfo.getJobGroup());
jobLog.setJobId(jobInfo.getId());
jobLog.setTriggerTime(new Date());
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());
// 2丢氢、init trigger-param
TriggerParam triggerParam = new TriggerParam();
triggerParam.setJobId(jobInfo.getId());
triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
triggerParam.setExecutorParams(jobInfo.getExecutorParam());
triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
triggerParam.setLogId(jobLog.getId());
triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());
triggerParam.setGlueType(jobInfo.getGlueType());
triggerParam.setGlueSource(jobInfo.getGlueSource());
triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
triggerParam.setBroadcastIndex(index);
triggerParam.setBroadcastTotal(total);
// 3、init address
String address = null;
ReturnT<String> routeAddressResult = null;
if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
if (index < group.getRegistryList().size()) {
address = group.getRegistryList().get(index);
} else {
address = group.getRegistryList().get(0);
}
} else {
// 根據(jù)不同路由策略獲取執(zhí)行器 ip 地址
routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
address = routeAddressResult.getContent();
}
}
} else {
routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
}
// 4智嚷、trigger remote executor
ReturnT<String> triggerResult = null;
if (address != null) {
triggerResult = runExecutor(triggerParam, address);
} else {
triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
}
// 5卖丸、collection trigger info
StringBuffer triggerMsgSb = new StringBuffer();
triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":")
.append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());
if (shardingParam != null) {
triggerMsgSb.append("("+shardingParam+")");
}
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);
triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>")
.append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():"");
// 6、save log trigger-info
jobLog.setExecutorAddress(address);
jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
jobLog.setExecutorParam(jobInfo.getExecutorParam());
jobLog.setExecutorShardingParam(shardingParam);
jobLog.setExecutorFailRetryCount(finalFailRetryCount);
//jobLog.setTriggerTime();
jobLog.setTriggerCode(triggerResult.getCode());
jobLog.setTriggerMsg(triggerMsgSb.toString());
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);
logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
}
7.3 執(zhí)行器接收并處理任務(wù)
執(zhí)行器接收到調(diào)度中心的調(diào)度請求時盏道,如果任務(wù)類型為 “Bean模式”稍浆,將會匹配 Spring 容器中的 “Bean模式任務(wù)”,然后調(diào)用其 execute 方法猜嘱,執(zhí)行任務(wù)邏輯衅枫。如果任務(wù)類型為 “GLUE模式”,將會加載 GLue 代碼朗伶,實例化 Java 對象弦撩,注入依賴的 Spring 服務(wù)(注意:Glue代碼中注入的Spring服務(wù),必須存在與該“執(zhí)行器”項目的Spring容器中)论皆,然后調(diào)用execute方法益楼,執(zhí)行任務(wù)邏輯。
7.3.1 EmbedServer 接收調(diào)度請求
EmbedHttpServerHandler 調(diào)用 channelRead0 方法處理任務(wù)調(diào)度請求点晴,每個請求通過線程池 bizThreadPool 新開一個線程進行處理
@Override
protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
// invoke
bizThreadPool.execute(new Runnable() {
@Override
public void run() {
// do invoke
Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);
// to json
String responseJson = GsonTool.toJson(responseObj);
// write response
writeResponse(ctx, keepAlive, responseJson);
}
});
}
process 方法根據(jù) uri 的不同取值調(diào)用 ExecutorBiz 不同的處理方法:
if ("/beat".equals(uri)) {
return executorBiz.beat();
} else if ("/idleBeat".equals(uri)) {
IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
return executorBiz.idleBeat(idleBeatParam);
} else if ("/run".equals(uri)) {
TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
return executorBiz.run(triggerParam);
} else if ("/kill".equals(uri)) {
KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
return executorBiz.kill(killParam);
} else if ("/log".equals(uri)) {
LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
return executorBiz.log(logParam);
} else {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");
}
7.3.2 ExecutorBiz(執(zhí)行器單元)
ExecutorBiz#run(TriggerParam triggerParam) 方法處理任務(wù)請求感凤,源碼如下
public ReturnT<String> run(TriggerParam triggerParam) {
// load old:jobHandler + jobThread
JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
String removeOldReason = null;
// valid:jobHandler + jobThread
// 1 根據(jù)任務(wù)模式(GlueTypeEnum)的不同進行不同的任務(wù)處理邏輯
GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
if (GlueTypeEnum.BEAN == glueTypeEnum) { // “Bean模式” 任務(wù)
// 根據(jù) triggerParam.getExecutorHandler() 加載任務(wù)處理器,此處 triggerParam.getExecutorHandler() 等于 @XxlJob 注解中的 value 值
IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
// valid old jobThread
if (jobThread!=null && jobHandler != newJobHandler) {
// change handler, need kill old thread
removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";
jobThread = null;
jobHandler = null;
}
// valid handler
if (jobHandler == null) {
jobHandler = newJobHandler;
if (jobHandler == null) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
}
}
} else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) { // “GLUE模式(Java)” 任務(wù)
// valid old jobThread
if (jobThread != null &&
!(jobThread.getHandler() instanceof GlueJobHandler
&& ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
// change handler or gluesource updated, need kill old thread
removeOldReason = "change job source or glue type, and terminate the old job thread.";
jobThread = null;
jobHandler = null;
}
// valid handler
if (jobHandler == null) {
try {
IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
} catch (Exception e) {
logger.error(e.getMessage(), e);
return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
}
}
} else if (glueTypeEnum!=null && glueTypeEnum.isScript()) { // 其他腳本模式任務(wù)
// valid old jobThread
if (jobThread != null &&
!(jobThread.getHandler() instanceof ScriptJobHandler
&& ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
// change script or gluesource updated, need kill old thread
removeOldReason = "change job source or glue type, and terminate the old job thread.";
jobThread = null;
jobHandler = null;
}
// valid handler
if (jobHandler == null) {
jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));
}
} else {
return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
}
// 2 處理不同的阻塞策略
if (jobThread != null) {
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
// 丟棄后續(xù)調(diào)度:調(diào)度請求進入單機執(zhí)行器后粒督,發(fā)現(xiàn)執(zhí)行器存在運行的調(diào)度任務(wù)陪竿,本次請求將會被丟棄并標(biāo)記為失敗
if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
// discard when running
if (jobThread.isRunningOrHasQueue()) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
}
} else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) { // 覆蓋之前調(diào)度:調(diào)度請求進入單機執(zhí)行器后,發(fā)現(xiàn)執(zhí)行器存在運行的調(diào)度任務(wù)屠橄,將會終止運行中的調(diào)度任務(wù)并清空隊列族跛,然后運行本地調(diào)度任務(wù)
// kill running jobThread
if (jobThread.isRunningOrHasQueue()) {
removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
jobThread = null;
}
} else {
// just queue trigger
}
}
// 新建或替換任務(wù)已有的 jobHandler
if (jobThread == null) {
jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
}
// 3 將調(diào)度請求直接放入 jobThread 的觸發(fā)隊列進行異步處理
ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
return pushResult;
}
run() 方法主要做了以下幾件事:
-
根據(jù)任務(wù)模式(GlueTypeEnum)的不同進行不同的任務(wù)處理邏輯
-
Bean模式
每個Bean模式任務(wù)都是一個Spring的Bean類實例闰挡,它被維護在“執(zhí)行器”項目的Spring容器中。任務(wù)類需要加“@JobHandler(value=”名稱”)”注解礁哄,因為“執(zhí)行器”會根據(jù)該注解識別Spring容器中的任務(wù)长酗。任務(wù)類需要繼承統(tǒng)一接口“IJobHandler”,任務(wù)邏輯在execute方法中開發(fā)姐仅,因為“執(zhí)行器”在接收到調(diào)度中心的調(diào)度請求時花枫,將會調(diào)用“IJobHandler”的execute方法,執(zhí)行任務(wù)邏輯
-
GLUE模式(Java)
每個 “GLUE模式(Java)” 任務(wù)的代碼掏膏,實際上是“一個繼承自“IJobHandler”的實現(xiàn)類的類代碼”劳翰,“執(zhí)行器”接收到“調(diào)度中心”的調(diào)度請求時,會通過Groovy類加載器加載此代碼馒疹,實例化成Java對象佳簸,同時注入此代碼中聲明的Spring服務(wù)(請確保Glue代碼中的服務(wù)和類引用在“執(zhí)行器”項目中存在),然后調(diào)用該對象的execute方法颖变,執(zhí)行任務(wù)邏輯
-
其他腳本任務(wù)
腳本任務(wù)的源碼托管在調(diào)度中心生均,腳本邏輯在執(zhí)行器運行。當(dāng)觸發(fā)腳本任務(wù)時腥刹,執(zhí)行器會加載腳本源碼在執(zhí)行器機器上生成一份腳本文件马胧,然后通過Java代碼調(diào)用該腳本;并且實時將腳本輸出日志寫到任務(wù)日志文件中衔峰,從而在調(diào)度中心可以實時監(jiān)控腳本運行情況
-
-
根據(jù) blockStrategy 處理不同的阻塞策略
- 丟棄后續(xù)調(diào)度:調(diào)度請求進入單機執(zhí)行器后佩脊,發(fā)現(xiàn)執(zhí)行器存在運行的調(diào)度任務(wù),本次請求將會被丟棄并標(biāo)記為失敗
- 覆蓋之前調(diào)度:調(diào)度請求進入單機執(zhí)行器后垫卤,發(fā)現(xiàn)執(zhí)行器存在運行的調(diào)度任務(wù)威彰,將會終止運行中的調(diào)度任務(wù)并清空隊列,然后運行本地調(diào)度任務(wù)
if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) { /* 丟棄后續(xù)調(diào)度:調(diào)度請求進入單機執(zhí)行器后穴肘,發(fā)現(xiàn)執(zhí)行器存在運行的調(diào)度任務(wù)歇盼,本次請求將會被丟棄并標(biāo)記為失敗 */ if (jobThread.isRunningOrHasQueue()) { return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle()); } } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) { /* 覆蓋之前調(diào)度:若當(dāng)前任務(wù)的處理線程(jobThread)有正在運行的任務(wù)或調(diào)度請求隊列不為空,則清空當(dāng)前任務(wù)處理線程 然后創(chuàng)建一個新的 jobThread 處理當(dāng)前調(diào)度請求 */ if (jobThread.isRunningOrHasQueue()) { removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle(); jobThread = null; } } else { // just queue trigger } if (jobThread == null) { // registJobThread 創(chuàng)建新的任務(wù)處理進程并將已存在的任務(wù)處理進程清除 jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason); }
-
將調(diào)度請求放入 JobThread 的阻塞隊列 triggerQueue评抚,等待后續(xù)處理
ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam); LinkedBlockingQueue<TriggerParam> triggerQueue public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) { // avoid repeat if (triggerLogIdSet.contains(triggerParam.getLogId())) { logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId()); return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId()); } // avoid repeat trigger for the same TRIGGER_LOG_ID triggerLogIdSet.add(triggerParam.getLogId()); triggerQueue.add(triggerParam); return ReturnT.SUCCESS; }
JobThread 通過 run() 方法循環(huán)處理請求豹缀,具體處理細節(jié)參見下節(jié) 7.3.4 JobThread:任務(wù)處理線程
7.3.4 JobThread(任務(wù)處理線程)
public void run() {
// init
handler.init();
// 1 從阻塞隊列獲取調(diào)度請求并處理
while(!toStop){
// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
TriggerParam triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
// execute
XxlJobLogger.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + triggerParam.getExecutorParams());
// 若任務(wù)設(shè)置超時時間,則在限定時間內(nèi)異步獲取執(zhí)行結(jié)果
if (triggerParam.getExecutorTimeout() > 0) {
// limit timeout
final TriggerParam triggerParamTmp = triggerParam;
FutureTask<ReturnT<String>> futureTask = new FutureTask<ReturnT<String>>(new Callable<ReturnT<String>>() {
@Override
public ReturnT<String> call() throws Exception {
return handler.execute(triggerParamTmp.getExecutorParams());
}
});
futureThread = new Thread(futureTask);
futureThread.start();
// 異步獲取執(zhí)行結(jié)果
executeResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
} else {
// 若沒有設(shè)置超時時間慨代,則同步獲取任務(wù)執(zhí)行結(jié)果
executeResult = handler.execute(triggerParam.getExecutorParams());
}
if (executeResult == null) {
executeResult = IJobHandler.FAIL;
} else {
executeResult.setMsg(
(executeResult!=null&&executeResult.getMsg()!=null&&executeResult.getMsg().length()>50000)
?executeResult.getMsg().substring(0, 50000).concat("...")
:executeResult.getMsg());
executeResult.setContent(null); // limit obj size
}
XxlJobLogger.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + executeResult);
// 2 任務(wù)處理結(jié)果回調(diào)
if(triggerParam != null) {
// callback handler info
if (!toStop) {
// commonm
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), executeResult));
} else {
// is killed
ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job running, killed]");
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult));
}
}
}
// 3 任務(wù)處理進行被殺死耿眉,隊列中剩余的調(diào)度請求進行失敗處理,并回調(diào)任務(wù)處理結(jié)果
while(triggerQueue !=null && triggerQueue.size()>0){
TriggerParam triggerParam = triggerQueue.poll();
if (triggerParam!=null) {
// is killed
ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job not executed, in the job queue, killed.]");
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult));
}
}
// destroy
handler.destroy();
}
run() 方法主要做了以下幾件事:
-
從阻塞隊列獲取調(diào)度請求并處理
// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout) TriggerParam triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS); // 若任務(wù)設(shè)置超時時間鱼响,則在限定時間內(nèi)異步獲取執(zhí)行結(jié)果 if (triggerParam.getExecutorTimeout() > 0) { // limit timeout final TriggerParam triggerParamTmp = triggerParam; FutureTask<ReturnT<String>> futureTask = new FutureTask<ReturnT<String>>(new Callable<ReturnT<String>>() { @Override public ReturnT<String> call() throws Exception { return handler.execute(triggerParamTmp.getExecutorParams()); } }); futureThread = new Thread(futureTask); futureThread.start(); // 異步獲取執(zhí)行結(jié)果 executeResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS); } else { // 若沒有設(shè)置超時時間,則同步獲取任務(wù)執(zhí)行結(jié)果 executeResult = handler.execute(triggerParam.getExecutorParams()); }
-
任務(wù)處理結(jié)果回調(diào)
if (!toStop) { // commonm TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), executeResult)); } else { // is killed ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job running, killed]"); TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult)); }
將任務(wù)執(zhí)行結(jié)果 executeResult 放入 TriggerCallbackThread 的任務(wù)結(jié)果隊列 callBackQueue
/** * job results callback queue */ private LinkedBlockingQueue<HandleCallbackParam> callBackQueue = new LinkedBlockingQueue<HandleCallbackParam>(); public static void pushCallBack(HandleCallbackParam callback){ getInstance().callBackQueue.add(callback); }
TriggerCallbackThread 創(chuàng)建 triggerCallbackThread 線程從 callBackQueue 中獲取 executeResult 并將其以 HTTP POST 形式發(fā)送給調(diào)度中心组底,核心處理方法見 doCallback()
private void doCallback(List<HandleCallbackParam> callbackParamList){ // callback, will retry if error for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) { ReturnT<String> callbackResult = adminBiz.callback(callbackParamList); } } public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) { return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, timeout, callbackParamList, String.class); }
調(diào)度中心獲取任務(wù)結(jié)果參數(shù)后丈积,調(diào)用AdminBizImpl#callback(com.xxl.job.core.biz.model.HandleCallbackParam) 方法進行處理
private ReturnT<String> callback(HandleCallbackParam handleCallbackParam) { // valid log item XxlJobLog log = xxlJobLogDao.load(handleCallbackParam.getLogId()); } // trigger success, to trigger child job String callbackMsg = null; if (IJobHandler.SUCCESS.getCode() == handleCallbackParam.getExecuteResult().getCode()) { XxlJobInfo xxlJobInfo = xxlJobInfoDao.loadById(log.getJobId()); if (xxlJobInfo!=null && xxlJobInfo.getChildJobId()!=null && xxlJobInfo.getChildJobId().trim().length()>0) { callbackMsg = "<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_child_run") +"<<<<<<<<<<< </span><br>"; String[] childJobIds = xxlJobInfo.getChildJobId().split(","); for (int i = 0; i < childJobIds.length; i++) { int childJobId = (childJobIds[i]!=null && childJobIds[i].trim().length()>0 && isNumeric(childJobIds[i]))?Integer.valueOf(childJobIds[i]):-1; if (childJobId > 0) { JobTriggerPoolHelper.trigger(childJobId, TriggerTypeEnum.PARENT, -1, null, null, null); ReturnT<String> triggerChildResult = ReturnT.SUCCESS; // add msg callbackMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg1"), (i+1), childJobIds.length, childJobIds[i], (triggerChildResult.getCode()==ReturnT.SUCCESS_CODE?I18nUtil.getString("system_success"):I18nUtil.getString("system_fail")), triggerChildResult.getMsg()); } else { callbackMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg2"), (i+1), childJobIds.length, childJobIds[i]); } } } } // handle msg StringBuffer handleMsg = new StringBuffer(); if (log.getHandleMsg()!=null) { handleMsg.append(log.getHandleMsg()).append("<br>"); } if (handleCallbackParam.getExecuteResult().getMsg() != null) { handleMsg.append(handleCallbackParam.getExecuteResult().getMsg()); } if (callbackMsg != null) { handleMsg.append(callbackMsg); } if (handleMsg.length() > 15000) { handleMsg = new StringBuffer(handleMsg.substring(0, 15000)); // text最大64kb 避免長度過長 } // success, save log log.setHandleTime(new Date()); log.setHandleCode(handleCallbackParam.getExecuteResult().getCode()); log.setHandleMsg(handleMsg.toString()); xxlJobLogDao.updateHandleInfo(log); return ReturnT.SUCCESS; }
-
JobThread 停止后處理隊列遺留調(diào)度請求
while(triggerQueue !=null && triggerQueue.size()>0){ TriggerParam triggerParam = triggerQueue.poll(); if (triggerParam!=null) { // is killed ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job not executed, in the job queue, killed.]"); TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult)); } }
7.3.5 IJobHandler(任務(wù)單元)
public abstract class IJobHandler {
/** success */
public static final ReturnT<String> SUCCESS = new ReturnT<String>(200, null);
/** fail */
public static final ReturnT<String> FAIL = new ReturnT<String>(500, null);
/** fail timeout */
public static final ReturnT<String> FAIL_TIMEOUT = new ReturnT<String>(502, null);
/**
* execute handler, invoked when executor receives a scheduling request
*
* @param param
* @return
* @throws Exception
*/
public abstract ReturnT<String> execute(String param) throws Exception;
/**
* init handler, invoked when JobThread init
*/
public void init() throws InvocationTargetException, IllegalAccessException {
// do something
}
/**
* destroy handler, invoked when JobThread destroy
*/
public void destroy() throws InvocationTargetException, IllegalAccessException {
// do something
}
}
IJobHandler 有四個實現(xiàn)類:
MethodJobHandler:負責(zé)處理 BEAN 模式的任務(wù)
GlueJobHandler:負責(zé)處理 Glue 模式的任務(wù)
ScriptJobHandler:負責(zé)處理腳本模式的任務(wù)
CommandJobHandler:負責(zé)處理命令行任務(wù)
7.4 心跳檢測
任務(wù)注冊 Beat 周期默認30s
執(zhí)行器以一倍 Beat 進行執(zhí)行器注冊
調(diào)度中心以一倍 Beat 進行動態(tài)任務(wù)發(fā)現(xiàn)
執(zhí)行器注冊信息的失效時間為三倍 Beat
執(zhí)行器銷毀筐骇,主動上報調(diào)度中心并摘除對應(yīng)的執(zhí)行器機器信息
八 XXL-JOB 改造
8.1 自動裝配
8.1.1 XxlJobAutoConfigure
XxlJobAutoConfigure 自動裝配 XxlJobSpringExecutor 到項目的 Spring 容器中
/**
* matchIfMissing 屬性為true時籽前,配置文件中缺少對應(yīng)的value或name的對應(yīng)的屬性值庆械,也會注入成功
*/
@Configuration
@EnableConfigurationProperties(XxlJobSpringExecutorProperties.class)
@ConditionalOnClass(XxlJobSpringExecutor.class)
@ConditionalOnProperty(prefix = "xxl-job", value = "enabled", matchIfMissing = true)
public class XxlJobAutoConfigure {
private static Logger LOGGER = LoggerFactory.getLogger(XxlJobAutoConfigure.class);
@Autowired
private XxlJobSpringExecutorProperties properties;
@Bean
@ConditionalOnMissingBean(XxlJobSpringExecutor.class)
public XxlJobSpringExecutor xxlJobExecutor() {
LOGGER.info(">>>>>>>>>>> xxl-job config auto init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(properties.getAdmin().getAddresses());
xxlJobSpringExecutor.setAppname(properties.getExecutor().getAppname());
xxlJobSpringExecutor.setAddress(properties.getExecutor().getAddress());
xxlJobSpringExecutor.setIp(properties.getExecutor().getIp());
xxlJobSpringExecutor.setPort(properties.getExecutor().getPort());
xxlJobSpringExecutor.setAccessToken(properties.getAccessToken());
xxlJobSpringExecutor.setLogPath(properties.getExecutor().getLogPath());
xxlJobSpringExecutor.setLogRetentionDays(properties.getExecutor().getLogRetentionDays());
xxlJobSpringExecutor.setApplicationContext(SpringUtils.getApplicationContext());
LOGGER.info(">>>>>>>>>>> XxlJobSpringExecutor: {}", xxlJobSpringExecutor.toString());
LOGGER.info(">>>>>>>>>>> xxl-job config auto init end.");
return xxlJobSpringExecutor;
}
}
8.1.2 XxlJobSpringExecutorProperties
XxlJobSpringExecutor 的屬性全部放在 XxlJobSpringExecutorProperties 中
@ConfigurationProperties(prefix = "xxl-job")
public class XxlJobSpringExecutorProperties {
private String accessToken;
@NestedConfigurationProperty
private Admin admin = new Admin();
@NestedConfigurationProperty
private Executor executor = new Executor();
public String getAccessToken() {
return accessToken;
}
public void setAccessToken(String accessToken) {
this.accessToken = accessToken;
}
public Admin getAdmin() {
return admin;
}
public void setAdmin(Admin admin) {
this.admin = admin;
}
public Executor getExecutor() {
return executor;
}
public void setExecutor(Executor executor) {
this.executor = executor;
}
public static class Admin {
private String addresses;
public String getAddresses() {
return addresses;
}
public void setAddresses(String addresses) {
this.addresses = addresses;
}
}
public static class Executor {
private String appname;
private String address;
private String ip;
private int port;
private String logPath;
private int logRetentionDays;
public String getAppname() {
return appname;
}
public void setAppname(String appname) {
this.appname = appname;
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public String getLogPath() {
return logPath;
}
public void setLogPath(String logPath) {
this.logPath = logPath;
}
public int getLogRetentionDays() {
return logRetentionDays;
}
public void setLogRetentionDays(int logRetentionDays) {
this.logRetentionDays = logRetentionDays;
}
}
}
8.1.3 配置 spring.factories
在項目的 resources\META-INF 目錄新建 spring.factories 文件,并配置自動配置類 XxlJobAutoConfigure
org.springframework.boot.autoconfigure.EnableAutoConfiguration = com.xxl.job.core.properties.auto.configure.XxlJobAutoConfigure
8.2 集成 spring-session
8.2.1 引入 spring-session 相關(guān)依賴
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.session</groupId>
<artifactId>spring-session-bom</artifactId>
<version>Corn-SR2</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<!-- Spring Session core -->
<dependency>
<groupId>org.springframework.session</groupId>
<artifactId>spring-session-core</artifactId>
</dependency>
<!-- Spring Session Data Redis -->
<dependency>
<groupId>org.springframework.session</groupId>
<artifactId>spring-session-data-redis</artifactId>
</dependency>
<!-- Spring Boot Redis Data Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>2.3.1.RELEASE</version>
</dependency>
8.2.2 添加相關(guān)配置
在 application.properties 文件中添加一下配置
# spring session
## equivalent to manually adding @EnableRedisHttpSession annotation
spring.session.store-type=redis
## flush-mode 有兩個參數(shù):ON_SAVE(表示在response commit前刷新緩存)棒呛,IMMEDIATE(表示只要有更新唬滑,就刷新緩存)
spring.session.redis.flush-mode=on_save
## 添加后告唆,redis中的key為spring:session:xxl-job
spring.session.redis.namespace=xxl-job
# redis config
## Redis server host.
## Redis 要開啟事件通知 redis-cli config set notify-keyspace-events Egx
spring.redis.host=192.168.99.100
## Login password of the redis server.
spring.redis.password=
## Redis server port.
spring.redis.port=16379
8.2.3 使用 HttpSession 進行邏輯處理
略
九 XXL-JOB 部署
環(huán)境 | QA | PP | P |
---|---|---|---|
基礎(chǔ)設(shè)施 | Linux 服務(wù)器(內(nèi)置 Java 環(huán)境)—— 1 臺,金橋 k8s | Linux 服務(wù)器(內(nèi)置 Java 環(huán)境)—— 1 臺 金橋 k8s | Linux 服務(wù)器(內(nèi)置 Java 環(huán)境)—— 2 臺 金橋 k8s |
中間件 | MySQL(主從) Redis 集群(單例) |
MySQL(主從) Redis 集群(單例) |
MySQL 主從 Redis 集群 |
其他 | 內(nèi)網(wǎng)域名 | 內(nèi)網(wǎng)域名 | 內(nèi)網(wǎng)域名 |
附錄
Cron 表達式
Cron表達式是一個字符串晶密,字符串以5或6個空格隔開擒悬,分為6或7個域,每一個域代表一個含義稻艰,Cron有如下兩種語法格式:
- Seconds Minutes Hours DayofMonth Month DayofWeek Year
- Seconds Minutes Hours DayofMonth Month DayofWeek
結(jié)構(gòu)
corn 從左到右(用空格隔開):秒 分 小時 月份中的日期 月份 星期中的日期 年份
字段含義
字段 | 允許值 | 允許的特殊字符 |
---|---|---|
秒(Seconds) | 0~59的整數(shù) | , - * / 四個字符 |
分(Minutes) | 0~59的整數(shù) | , - * / 四個字符 |
小時(Hours) | 0~23的整數(shù) | , - * / 四個字符 |
日期(DayofMonth) | 1~31的整數(shù)(但是你需要考慮你月的天數(shù)) | ,- * ? / L W C 八個字符 |
月份(Month) | 1~12的整數(shù)或者 JAN-DEC | , - * / 四個字符 |
星期(DayofWeek) | 1~7的整數(shù)或者 SUN-SAT (1=SUN) | , - * ? / L C # 八個字符 |
年(可選懂牧,留空)(Year) | 1970~2099 | , - * / 四個字符 |
每一個域都使用數(shù)字,但還可以出現(xiàn)如下特殊字符尊勿,它們的含義是:
* :表示匹配該域的任意值僧凤。假如在Minutes域使用*, 即表示每分鐘都會觸發(fā)事件
? :只能用在DayofMonth和DayofWeek兩個域。它也匹配域的任意值元扔,但實際不會躯保。因為DayofMonth和DayofWeek會相互影響。例如想在每月的20日觸發(fā)調(diào)度澎语,不管20日到底是星期幾途事,則只能使用如下寫法: 13 13 15 20 * ?, 其中最后一位只能用?咏连,而不能使用*盯孙,如果使用*表示不管星期幾都會觸發(fā),實際上并不是這樣
- :表示范圍祟滴。例如在Minutes域使用5-20振惰,表示從5分到20分鐘每分鐘觸發(fā)一次
/ :表示起始時間開始觸發(fā),然后每隔固定時間觸發(fā)一次垄懂。例如在Minutes域使用5/20,則意味著5分鐘觸發(fā)一次骑晶,而25,45等分別觸發(fā)一次
, :表示列出枚舉值草慧。例如:在Minutes域使用5,20桶蛔,則意味著在5和20分每分鐘觸發(fā)一次
L :表示最后,只能出現(xiàn)在DayofWeek和DayofMonth域漫谷。如果在DayofWeek域使用5L,意味著在最后的一個星期四觸發(fā)
W :表示有效工作日(周一到周五),只能出現(xiàn)在DayofMonth域仔雷,系統(tǒng)將在離指定日期的最近的有效工作日觸發(fā)事件。例如:在 DayofMonth使用5W,如果5日是星期六碟婆,則將在最近的工作日:星期五电抚,即4日觸發(fā)。如果5日是星期天竖共,則在6日(周一)觸發(fā)蝙叛;如果5日在星期一到星期五中的一天,則就在5日觸發(fā)公给。另外一點借帘,W的最近尋找不會跨過月份
LW :這兩個字符可以連用,表示在某個月最后一個工作日淌铐,即最后一個星期五
# :用于確定每個月第幾個星期幾肺然,只能出現(xiàn)在DayofMonth域。例如在4#2匣沼,表示某月的第二個星期三