失效轉移: 運行中的作業(yè)服務器崩潰不會導致重新分片挺尿,只會在下次作業(yè)啟動時分片场躯。啟用失效轉移功能可以在本次作業(yè)執(zhí)行過程中堪侯,監(jiān)測其他作業(yè)服務器空閑,抓取未完成的孤兒分片項執(zhí)行苦酱。
實現(xiàn)失效轉移功能,在某臺服務器執(zhí)行完畢后主動抓取未分配的分片给猾,并且在某臺服務器下線后主動尋找可用的服務器執(zhí)行任務疫萤。
當系統(tǒng)crashed的時候,和zk連接的臨時節(jié)點會被刪除敢伸,從而觸發(fā)對臨時節(jié)點的監(jiān)控.(實測下來扯饶,只要機器關閉,都會執(zhí)行一次failover,不知道是不是bug)具體是這樣實現(xiàn)的尾序,首先會判斷是否是自己機器接收到該事件钓丰,如果是,則不處理每币。然后獲取失效節(jié)點(/{jobName}/sharding/{index}/failover
)携丁,若失效節(jié)點不為null,則將該分片狀態(tài)置為failover狀態(tài)(創(chuàng)建{jobName}/leader/failover/items/{index}
節(jié)點)兰怠,然后判斷是否需要failOver梦鉴,需要則執(zhí)行failOver,不需要則結束揭保。如果失效節(jié)點為null肥橙,則獲取所有該jobInstance的分片項,創(chuàng)建需要failover的分片項{jobName}/leader/failover/items/{index}
秸侣,判斷是否需要failover存筏,需要則執(zhí)行failover,不需要則結束塔次。
class JobCrashedJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
if (isFailoverEnabled() && Type.NODE_REMOVED == eventType && instanceNode.isInstancePath(path)) {
String jobInstanceId = path.substring(instanceNode.getInstanceFullPath().length() + 1);
if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) {
return;
}
List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId);
if (!failoverItems.isEmpty()) {
for (int each : failoverItems) {
// /{jobName}/sharding/{index}/failOver
failoverService.setCrashedFailoverFlag(each);
//如果需要失效轉移則失效轉移
failoverService.failoverIfNecessary();
}
} else {
for (int each : shardingService.getShardingItems(jobInstanceId)) {
//將所有該作業(yè)的實例都置為failOver。
failoverService.setCrashedFailoverFlag(each);
//如果需要失效轉移則失效轉移
failoverService.failoverIfNecessary();
}
}
}
}
}
public void setCrashedFailoverFlag(final int item) {
if (!isFailoverAssigned(item)) {
//創(chuàng)建節(jié)點 /{jobName}/leader/failover/items/{index}
jobNodeStorage.createJobNodeIfNeeded(FailoverNode.getItemsNode(item));
}
}
而在失效轉移過程中名秀,需要判斷是否需要失效轉移励负,判斷是否/{jobName}/leader/failover/items/{index}
該節(jié)點存在,且該job不是處于run狀態(tài)匕得,然后選舉一個主節(jié)點/{jobName}/leader/failover/latch
,然后由主節(jié)點回調继榆。
public void failoverIfNecessary() {
if (needFailover()) {
//選舉主節(jié)點
jobNodeStorage.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback());
}
}
public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {
try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {
latch.start();
latch.await();
callback.execute();
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
handleException(ex);
}
}
而在回調過程中,則將分片參數(shù)綁定/{jobName}/sharding/{index}/failover
綁定jobInstanceId汁掠。然后將{jobName}/leader/failover/items/{index}
刪除略吨。每次觸發(fā)一個分片,若有多個分片考阱,則for循環(huán)處理了翠忠,這里是根據(jù)每個分片做處理,所以這里只需要拿到一個分片處理即可乞榨,再去執(zhí)行triggerJob秽之,執(zhí)行作業(yè)。
class FailoverLeaderExecutionCallback implements LeaderExecutionCallback {
@Override
public void execute() {
if (JobRegistry.getInstance().isShutdown(jobName) || !needFailover()) {
return;
}
int crashedItem = Integer.parseInt(jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0));
log.debug("Failover job '{}' begin, crashed item '{}'", jobName, crashedItem);
jobNodeStorage.fillEphemeralJobNode(FailoverNode.getExecutionFailoverNode(crashedItem), JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem));
// TODO 不應使用triggerJob, 而是使用executor統(tǒng)一調度
JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
if (null != jobScheduleController) {
jobScheduleController.triggerJob();
}
}
}
如果所有節(jié)點都處于run狀態(tài)吃既,則在作業(yè)的執(zhí)行完成考榨,會去主動調用jobFacade.failoverIfNecessary();這樣就能保證上面所說的失效轉移功能,在某臺服務器執(zhí)行完畢后主動抓取未分配的分片鹦倚,并且在某臺服務器下線后主動尋找可用的服務器執(zhí)行任務河质。
當非作業(yè)運行時,系統(tǒng)crached或者說修改了分片參數(shù), 會設置重新分片掀鹅。
class ShardingTotalCountChangedJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
if (configNode.isConfigPath(path) && 0 != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
int newShardingTotalCount = LiteJobConfigurationGsonFactory.fromJson(data).getTypeConfig().getCoreConfig().getShardingTotalCount();
//當分片參數(shù)修改時散休,會重新分片
if (newShardingTotalCount != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
//重新分片
shardingService.setReshardingFlag();
JobRegistry.getInstance().setCurrentShardingTotalCount(jobName, newShardingTotalCount);
}
}
}
}
class ListenServersChangedJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {、
//當作業(yè)未關閉淫半,且作業(yè)實例變化 或者是服務器狀態(tài)發(fā)生變化
if (!JobRegistry.getInstance().isShutdown(jobName) && (isInstanceChange(eventType, path) || isServerChange(path))) {
shardingService.setReshardingFlag();
}
}
//如果系統(tǒng)crashed時溃槐,作業(yè)實例節(jié)點為NODE.REMOVED事件, 也就是會重新分片科吭。
private boolean isInstanceChange(final Type eventType, final String path) {
return instanceNode.isInstancePath(path) && Type.NODE_UPDATED != eventType;
}
private boolean isServerChange(final String path) {
return serverNode.isServerPath(path);
}
}