elasticJob 源碼解析之失效轉移FailOver

失效轉移: 運行中的作業(yè)服務器崩潰不會導致重新分片挺尿,只會在下次作業(yè)啟動時分片场躯。啟用失效轉移功能可以在本次作業(yè)執(zhí)行過程中堪侯,監(jiān)測其他作業(yè)服務器空閑,抓取未完成的孤兒分片項執(zhí)行苦酱。

實現(xiàn)失效轉移功能,在某臺服務器執(zhí)行完畢后主動抓取未分配的分片给猾,并且在某臺服務器下線后主動尋找可用的服務器執(zhí)行任務疫萤。

當系統(tǒng)crashed的時候,和zk連接的臨時節(jié)點會被刪除敢伸,從而觸發(fā)對臨時節(jié)點的監(jiān)控.(實測下來扯饶,只要機器關閉,都會執(zhí)行一次failover,不知道是不是bug)具體是這樣實現(xiàn)的尾序,首先會判斷是否是自己機器接收到該事件钓丰,如果是,則不處理每币。然后獲取失效節(jié)點(/{jobName}/sharding/{index}/failover)携丁,若失效節(jié)點不為null,則將該分片狀態(tài)置為failover狀態(tài)(創(chuàng)建{jobName}/leader/failover/items/{index}節(jié)點)兰怠,然后判斷是否需要failOver梦鉴,需要則執(zhí)行failOver,不需要則結束揭保。如果失效節(jié)點為null肥橙,則獲取所有該jobInstance的分片項,創(chuàng)建需要failover的分片項{jobName}/leader/failover/items/{index}秸侣,判斷是否需要failover存筏,需要則執(zhí)行failover,不需要則結束塔次。

 
class JobCrashedJobListener extends AbstractJobListener {

  @Override
  protected void dataChanged(final String path, final Type eventType, final String data) {
    if (isFailoverEnabled() && Type.NODE_REMOVED == eventType && instanceNode.isInstancePath(path)) {
      String jobInstanceId = path.substring(instanceNode.getInstanceFullPath().length() + 1);
      if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) {
        return;
      }
      List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId);
      if (!failoverItems.isEmpty()) {
        for (int each : failoverItems) {
          // /{jobName}/sharding/{index}/failOver
          failoverService.setCrashedFailoverFlag(each);
         //如果需要失效轉移則失效轉移
          failoverService.failoverIfNecessary();
        }
      } else {
        for (int each : shardingService.getShardingItems(jobInstanceId)) {
          //將所有該作業(yè)的實例都置為failOver。
          failoverService.setCrashedFailoverFlag(each);
          //如果需要失效轉移則失效轉移
          failoverService.failoverIfNecessary();
        }
      }
    }
  }
}

public void setCrashedFailoverFlag(final int item) {
  if (!isFailoverAssigned(item)) {
    //創(chuàng)建節(jié)點 /{jobName}/leader/failover/items/{index}
    jobNodeStorage.createJobNodeIfNeeded(FailoverNode.getItemsNode(item));
  }
}

而在失效轉移過程中名秀,需要判斷是否需要失效轉移励负,判斷是否/{jobName}/leader/failover/items/{index}該節(jié)點存在,且該job不是處于run狀態(tài)匕得,然后選舉一個主節(jié)點/{jobName}/leader/failover/latch,然后由主節(jié)點回調继榆。

public void failoverIfNecessary() {
  if (needFailover()) {
    //選舉主節(jié)點
    jobNodeStorage.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback());
  }
}

public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {            
  try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {         
    latch.start();                                                                                   
    latch.await();                                                                                   
    callback.execute();                                                                              
    //CHECKSTYLE:OFF                                                                                     
  } catch (final Exception ex) {                                                                       
    //CHECKSTYLE:ON                                                                                      
    handleException(ex);                                                                             
  }                                                                                                    
}                                                                                                        
                                                                                                          

而在回調過程中,則將分片參數(shù)綁定/{jobName}/sharding/{index}/failover綁定jobInstanceId汁掠。然后將{jobName}/leader/failover/items/{index}刪除略吨。每次觸發(fā)一個分片,若有多個分片考阱,則for循環(huán)處理了翠忠,這里是根據(jù)每個分片做處理,所以這里只需要拿到一個分片處理即可乞榨,再去執(zhí)行triggerJob秽之,執(zhí)行作業(yè)。

class FailoverLeaderExecutionCallback implements LeaderExecutionCallback {
  @Override
  public void execute() {
    if (JobRegistry.getInstance().isShutdown(jobName) || !needFailover()) {
      return;
    }
    int crashedItem = Integer.parseInt(jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0));
    log.debug("Failover job '{}' begin, crashed item '{}'", jobName, crashedItem);
    jobNodeStorage.fillEphemeralJobNode(FailoverNode.getExecutionFailoverNode(crashedItem), JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
    jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem));
    // TODO 不應使用triggerJob, 而是使用executor統(tǒng)一調度
    JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
    if (null != jobScheduleController) {
      jobScheduleController.triggerJob();
    }
  }
}

如果所有節(jié)點都處于run狀態(tài)吃既,則在作業(yè)的執(zhí)行完成考榨,會去主動調用jobFacade.failoverIfNecessary();這樣就能保證上面所說的失效轉移功能,在某臺服務器執(zhí)行完畢后主動抓取未分配的分片鹦倚,并且在某臺服務器下線后主動尋找可用的服務器執(zhí)行任務河质。

當非作業(yè)運行時,系統(tǒng)crached或者說修改了分片參數(shù), 會設置重新分片掀鹅。

class ShardingTotalCountChangedJobListener extends AbstractJobListener {

  @Override
  protected void dataChanged(final String path, final Type eventType, final String data) {
    if (configNode.isConfigPath(path) && 0 != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
      int newShardingTotalCount = LiteJobConfigurationGsonFactory.fromJson(data).getTypeConfig().getCoreConfig().getShardingTotalCount();
      //當分片參數(shù)修改時散休,會重新分片
      if (newShardingTotalCount != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
        //重新分片
        shardingService.setReshardingFlag();
        JobRegistry.getInstance().setCurrentShardingTotalCount(jobName, newShardingTotalCount);
      }
    }
  }
}

class ListenServersChangedJobListener extends AbstractJobListener {

  @Override
  protected void dataChanged(final String path, final Type eventType, final String data) {、
    //當作業(yè)未關閉淫半,且作業(yè)實例變化 或者是服務器狀態(tài)發(fā)生變化
    if (!JobRegistry.getInstance().isShutdown(jobName) && (isInstanceChange(eventType, path) || isServerChange(path))) {
      shardingService.setReshardingFlag();
    }
  } 
  //如果系統(tǒng)crashed時溃槐,作業(yè)實例節(jié)點為NODE.REMOVED事件, 也就是會重新分片科吭。
  private boolean isInstanceChange(final Type eventType, final String path) {
    return instanceNode.isInstancePath(path) && Type.NODE_UPDATED != eventType;
  }

  private boolean isServerChange(final String path) {
    return serverNode.isServerPath(path);
  }
}
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末昏滴,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子对人,更是在濱河造成了極大的恐慌谣殊,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,968評論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件牺弄,死亡現(xiàn)場離奇詭異姻几,居然都是意外死亡,警方通過查閱死者的電腦和手機势告,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,601評論 2 382
  • 文/潘曉璐 我一進店門蛇捌,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人咱台,你說我怎么就攤上這事络拌。” “怎么了回溺?”我有些...
    開封第一講書人閱讀 153,220評論 0 344
  • 文/不壞的土叔 我叫張陵春贸,是天一觀的道長。 經(jīng)常有香客問我遗遵,道長萍恕,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,416評論 1 279
  • 正文 為了忘掉前任车要,我火速辦了婚禮允粤,結果婚禮上,老公的妹妹穿的比我還像新娘翼岁。我一直安慰自己维哈,他們只是感情好,可當我...
    茶點故事閱讀 64,425評論 5 374
  • 文/花漫 我一把揭開白布登澜。 她就那樣靜靜地躺著阔挠,像睡著了一般。 火紅的嫁衣襯著肌膚如雪脑蠕。 梳的紋絲不亂的頭發(fā)上购撼,一...
    開封第一講書人閱讀 49,144評論 1 285
  • 那天跪削,我揣著相機與錄音,去河邊找鬼迂求。 笑死碾盐,一個胖子當著我的面吹牛,可吹牛的內容都是我干的揩局。 我是一名探鬼主播毫玖,決...
    沈念sama閱讀 38,432評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼凌盯!你這毒婦竟也來了付枫?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 37,088評論 0 261
  • 序言:老撾萬榮一對情侶失蹤驰怎,失蹤者是張志新(化名)和其女友劉穎阐滩,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體县忌,經(jīng)...
    沈念sama閱讀 43,586評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡掂榔,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,028評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了症杏。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片装获。...
    茶點故事閱讀 38,137評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖厉颤,靈堂內的尸體忽然破棺而出穴豫,到底是詐尸還是另有隱情,我是刑警寧澤走芋,帶...
    沈念sama閱讀 33,783評論 4 324
  • 正文 年R本政府宣布绩郎,位于F島的核電站潘鲫,受9級特大地震影響翁逞,放射性物質發(fā)生泄漏。R本人自食惡果不足惜溉仑,卻給世界環(huán)境...
    茶點故事閱讀 39,343評論 3 307
  • 文/蒙蒙 一挖函、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧浊竟,春花似錦怨喘、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,333評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至后频,卻和暖如春梳庆,著一層夾襖步出監(jiān)牢的瞬間暖途,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,559評論 1 262
  • 我被黑心中介騙來泰國打工膏执, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留驻售,地道東北人。 一個月前我還...
    沈念sama閱讀 45,595評論 2 355
  • 正文 我出身青樓更米,卻偏偏與公主長得像欺栗,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子征峦,可洞房花燭夜當晚...
    茶點故事閱讀 42,901評論 2 345

推薦閱讀更多精彩內容