項(xiàng)目中用到了elastic-job,自己用的時候也是根據(jù)別人的例子復(fù)制下來县爬,直接模仿用的墙基,一直也沒去理解它的工作原理软族,一知半解的,各個參數(shù)意思也不是很懂残制,然后今天突然想了解下它的工作原理立砸,看了官網(wǎng),以及網(wǎng)上的一些文章初茶,在這里做個記錄颗祝。
elastic-job是什么?
elastic-job是什么呢恼布?下面是摘自官網(wǎng)的話:
Elastic-Job是一個分布式調(diào)度解決方案螺戳,由兩個相互獨(dú)立的子項(xiàng)目Elastic-Job-Lite和Elastic-Job-Cloud組成。Elastic-Job-Lite定位為輕量級無中心化解決方案折汞,使用jar包的形式提供分布式任務(wù)的協(xié)調(diào)服務(wù)倔幼;Job-Cloud采用自研Mesos Framework的解決方案,額外提供資源治理爽待、應(yīng)用分發(fā)以及進(jìn)程隔離等功能(PS:我在這里只說Elastic-Job-Lite损同,因?yàn)镴ob-Cloud我沒去研究)。
簡單的說Elastic-Job-Lite就是一個分布式定時任務(wù)鸟款。
怎么用elastic-job
使用elastic-job很簡單膏燃,在elastic-job官網(wǎng)上也有例子。
- 添加maven依賴
<!-- 引入elastic-job-lite核心模塊 -->
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>${elasticjob.version}</version>
</dependency>
<!-- 使用springframework自定義命名空間時引入 -->
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>${elasticjob.version}</version>
</dependency>
2.寫定時任務(wù)類
public class TestJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
int item = shardingContext.getShardingItem();
// System.out.println(item);
switch (item) {
case 0:
// do something by sharding item 0
System.out.println(String.format("-----ThreadId:%s,當(dāng)前分片項(xiàng):%s",Thread.currentThread().getId(),item));
break;
case 1:
// do something by sharding item 1
System.out.println(String.format("-----ThreadId:%s,當(dāng)前分片項(xiàng):%s",Thread.currentThread().getId(),item));
break;
case 2:
// do something by sharding item 2
System.out.println(String.format("-----ThreadId:%s,當(dāng)前分片項(xiàng):%s",Thread.currentThread().getId(),item));
break;
}
}
}
job有好幾種作業(yè)類型何什,這里只是簡單的作業(yè)组哩,實(shí)現(xiàn)SimpleJob 即可
3.配置作業(yè)
與spring容器配合作業(yè),把bean依賴注入
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
xmlns:job="http://www.dangdang.com/schema/ddframe/job"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.dangdang.com/schema/ddframe/reg
http://www.dangdang.com/schema/ddframe/reg/reg.xsd
http://www.dangdang.com/schema/ddframe/job
http://www.dangdang.com/schema/ddframe/job/job.xsd
">
<!--配置作業(yè)注冊中心 -->
<reg:zookeeper id="jobRegesterCenter" server-lists="192.168.3.98:2181" namespace="lfp-elastic-job" base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3" />
<!-- 配置作業(yè)-->
<job:simple id="testJob" class="cn.laikyy.lfp.job.TestJob" registry-center-ref="jobRegesterCenter" cron="0 50 23 * * ?"
disabled="false" overwrite="true" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" description="測試" />
</beans>
OK,配置好后我們在elastic-job-lite-console控制臺去觸發(fā)執(zhí)行,看看輸出結(jié)果伶贰。
輸出結(jié)果:
-----ThreadId:81,當(dāng)前分片項(xiàng):0
-----ThreadId:82,當(dāng)前分片項(xiàng):1
-----ThreadId:83,當(dāng)前分片項(xiàng):2
好現(xiàn)在我們來分析一下輸出結(jié)果:
首先對于testJob我們配了sharding-total-count="3"蛛砰,意思是分3片,但是我們現(xiàn)在只有一臺機(jī)器執(zhí)行幕袱,所以輸出結(jié)果是當(dāng)前機(jī)器用了三個線程去跑這個定時任務(wù)暴备,每個線程對應(yīng)一個分片。
好们豌,那么如果我們用兩臺機(jī)器去執(zhí)行那么會怎么樣呢涯捻?由于elastic-job實(shí)例是根據(jù)ip來的,所以我這邊需要通過虛擬機(jī)或另一臺來執(zhí)行望迎,那么結(jié)果會怎么樣呢障癌?
實(shí)例1:
-----ThreadId:81,當(dāng)前分片項(xiàng):0
實(shí)例2:
-----ThreadId:92,當(dāng)前分片項(xiàng):1
-----ThreadId:93,當(dāng)前分片項(xiàng):2
我們可以看到,3片分到兩臺機(jī)器辩尊,有一臺執(zhí)行了兩個分片項(xiàng)涛浙,一臺執(zhí)行一個分片項(xiàng);當(dāng)然如果三臺實(shí)例去執(zhí)行摄欲,那么結(jié)果會是每臺執(zhí)行一個分片項(xiàng)轿亮;如果四臺機(jī)器去分3個任務(wù),那么最終執(zhí)行也是3臺胸墙,每臺執(zhí)行一個分片項(xiàng)我注,還有一臺空閑。
當(dāng)然當(dāng)某一臺掛了迟隅,那么調(diào)度中心就會自動把掛了的那臺的任務(wù)分給空閑的實(shí)例但骨。如果沒有空閑的實(shí)例那么就會把掛了的那臺的任務(wù)分給其他的一臺上去。
現(xiàn)實(shí)場景怎么用智袭?
好奔缠,我們大致了解了如何使用elastic-job,那如何應(yīng)用到我們實(shí)際的項(xiàng)目里呢吼野?比方說余額寶每天定時給用戶計算收益校哎。如果兩臺機(jī)器,我們可以把用戶id是奇數(shù)的給一臺機(jī)器去執(zhí)行瞳步,偶數(shù)的給另一臺執(zhí)行闷哆。像這樣
@Override
public void execute(ShardingContext shardingContext) {
int item = shardingContext.getShardingItem();
switch (item) {
case 0:
//1 查詢用戶id是奇數(shù)的用戶
//2 計算的收益
break;
case 1:
//1 查詢用戶id是偶數(shù)的用戶
//2 計算的收益
break;
}
sql查詢:
SELECT * FROM lfp_user WHERE mod(id,2)=0; //查詢id為偶數(shù)的用戶
SELECT * FROM lfp_user WHERE mod(id,2)=1; //查詢id為奇數(shù)的用戶
這樣的話,就把壓力平均分?jǐn)偟絻膳_服務(wù)器上去了谚攒,而且也能更快執(zhí)行完成阳准。
但是如果發(fā)現(xiàn)這段時間多了很多人氛堕,以前只要兩臺機(jī)器在1個小時就能跑完的馏臭,現(xiàn)在5個小時也跑不完,怎么辦呢?加機(jī)器括儒?加機(jī)器肯定是必須的绕沈,但是我們發(fā)現(xiàn)代碼里寫死了,分2片帮寻,我們總不能去改成3乍狐、4、5固逗,萬一以后還有更多呢浅蚪,所以我們可以
對sql進(jìn)一步優(yōu)化。我們把分片數(shù)和當(dāng)前分片項(xiàng)傳到sql烫罩,這樣sql可以動態(tài)去查詢對應(yīng)分片后的用戶了
改進(jìn)后sql:
//動態(tài)查詢該分片下要執(zhí)行的用戶
SELECT * FROM lfp_user WHERE mod(id,#{shardingTotalCount})=#{shardingItem};
改進(jìn)后代碼:
public class TestJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
int shardingTotalCount = shardingContext.getShardingTotalCount();
int shardingItem = shardingContext.getShardingItem();
//1 查詢改分片下要執(zhí)行的用戶惜傲,帶參數(shù)shardingTotalCount和shardingItem
//2 計算這些用戶收益
}
}
改進(jìn)后我們的代碼也簡潔了,當(dāng)shardingTotalCount 有5片贝攒,并且5臺機(jī)器
機(jī)器0:查詢用戶id能整除5的盗誊,如:5、10隘弊、15哈踱、20、梨熙、开镣、、
機(jī)器1:查詢用戶id除以5余1的串结,如:1哑子、6、11肌割、16卧蜓、、把敞、弥奸、
機(jī)器2:查詢用戶id除以5余2的,如:2奋早、7盛霎、12、17耽装、愤炸、、掉奄、
機(jī)器3:查詢用戶id除以5于余3的规个,如:3、8、13诞仓、18缤苫、、墅拭、活玲、
機(jī)器4:查詢用戶id除以5余4的,如:4谍婉、9舒憾、14、19穗熬、珍剑、、死陆、
同理招拙,當(dāng)6臺、7臺等措译,這樣我們就可以把服務(wù)器壓力分?jǐn)偟矫恳慌_上去了别凤,如果臨時壓力增大,我們還可以繼續(xù)加機(jī)器來緩解领虹,當(dāng)不需要這么多時我們可以減少機(jī)器實(shí)現(xiàn)分布式部署规哪。
這里只是記錄一下自己使用elastic-job,具體的實(shí)現(xiàn)與原理我也還沒有深入去了解,這段時間我也要深入了解下它的實(shí)現(xiàn)塌衰,這篇文章就寫到這诉稍。祝大家周末愉快。