Name | 描述 |
---|---|
JobRegistry | job注冊表 |
GuaranteeService | 保證分布式任務全部開始和結束狀態(tài)的服務 |
ElasticJobListener | 彈性化分布式作業(yè)監(jiān)聽器接口 |
JobScheduler | 作業(yè)調度器. |
JobNodeStorage | 作業(yè)節(jié)點數(shù)據(jù)訪問類 |
JobScheduleController | 作業(yè)調度控制器,其實就是quartz的scheduler |
config 節(jié)點
代表了這個job的配置信息阵谚,如果app啟動的時候沒有設置override=true && zk上的配置不為空,那么就使用zk已經(jīng)存在的zk信息,這一點要特別注意,如果改了配置信息斤程,但是沒有設置override就會出問題,但是有些信息也可以調節(jié)console里面的配置菩混,因為他是以zk為準的忿墅。
配置參數(shù)
Name | 描述 |
---|---|
monitorExecution | 默認是true,當job觸發(fā)的時候如果上一次還有分片在運行中則忽略掉本次運行 |
maxTimeDiffSeconds | 任務執(zhí)行機器和zk機器的時間差沮峡,如果設置了不是-1疚脐,那么兩者誤差要在這個范圍內,否則啟動報錯帖烘,但是只是記到了錯誤日志亮曹,啟動還是可以繼續(xù)的 |
-
每個sharding節(jié)點下的子節(jié)點有哪些橄杨?
- instance 節(jié)點秘症,記錄了這個分片執(zhí)行的instance的ip、pid
- running
- misfire
- disabled
- failover 節(jié)點式矫,表示這個分片原來是運行在實例上的的后來失敗被轉義到別的實例上的
-
重新分片
在每次執(zhí)行任務的時候乡摹,有一個步驟是獲取分片上下文,這個時候會檢查是否需要重新分片采转。- 如何確定是否需要分片呢聪廉?
當監(jiān)聽到分片總數(shù)變化、實例數(shù)量變化故慈、分布式作業(yè)不一致板熊、注冊作業(yè)啟動信息的時候會創(chuàng)建需要重新分片的節(jié)點 :leader/sharding/necessary
只有存在這個節(jié)點,才可能開始下面的分片邏輯:
- 如何確定是否需要分片呢聪廉?
分片
- 如果當前實例是leader節(jié)點察绷,他會等待所有的分片都執(zhí)行完成后干签,創(chuàng)建 /leader/sharding/processing 節(jié)點,表示分片動作正在執(zhí)行中拆撼,那么非leader節(jié)點看到存在這個節(jié)點就會一直等著不會繼續(xù)執(zhí)行容劳,leader節(jié)點首先重置所有的分片信息,例如原來是 0闸度,1竭贩,2三個分片,現(xiàn)在分片0莺禁,1 那么就會把2刪掉留量,接著使用分片策略,最后形成
Map<JobInstance, List<Integer>> 這個結構,最后將每個分片下面的instance節(jié)點的值寫為對應實例的id肪获。最后刪除/leader/sharding/necessary 和 /leader/sharding/processing 節(jié)點
- 如果當前實例是leader節(jié)點察绷,他會等待所有的分片都執(zhí)行完成后干签,創(chuàng)建 /leader/sharding/processing 節(jié)點,表示分片動作正在執(zhí)行中拆撼,那么非leader節(jié)點看到存在這個節(jié)點就會一直等著不會繼續(xù)執(zhí)行容劳,leader節(jié)點首先重置所有的分片信息,例如原來是 0闸度,1竭贩,2三個分片,現(xiàn)在分片0莺禁,1 那么就會把2刪掉留量,接著使用分片策略,最后形成
- 如果當前實例不是leader節(jié)點寝凌,就一直等到分片完成在繼續(xù)執(zhí)行,然后
misfire
misfire 在quartz里的原因是cron設置的執(zhí)行間隔很小孝赫,但是任務執(zhí)行的時間很長较木,導致下一次觸發(fā)執(zhí)行的時候上一次的任務還沒有執(zhí)行完成,這樣本次任務在執(zhí)行的時候發(fā)現(xiàn)sharding下面還有running節(jié)點青柄,就會設置這個sharding下的misfire節(jié)點 伐债,然后跳過本次執(zhí)行-
failover
failover 功能指的是當一個實例在執(zhí)行過程中宕機,可以把分片轉移到其他的實例上運行什么時候需要failover致开?
節(jié)點 /leader/failover/items 存在 并且 下面的子節(jié)點不為空 并且當前任務沒有在運行狀態(tài)
創(chuàng)建臨時節(jié)點 sharding/xx/failover
同時刪除 /leader/failover/items/xx
立即啟動作業(yè) scheduler.triggerJob(jobDetail.getKey());/leader/failover/items 節(jié)點以及子節(jié)點表示失效的節(jié)點峰锁,這些節(jié)點什么時候創(chuàng)建的呢?
每次執(zhí)行完成双戳,刪除failover節(jié)點
-
監(jiān)聽器
app啟動的時候會開啟這些監(jiān)聽器electionListenerManager
-
shardingListenerManager
- ShardingTotalCountChangedJobListener
分片數(shù)量變化虹蒋,得到通知,更新本地的分片數(shù)量緩存飒货,同時創(chuàng)建/leader/sharding/necessary節(jié)點 - ListenServersChangedJobListener
如果是server數(shù)量變化或者instance數(shù)量變化就設創(chuàng)建/leader/sharding/necessary節(jié)點
- ShardingTotalCountChangedJobListener
-
failoverListenerManager
監(jiān)聽/jobName , 節(jié)點或者數(shù)據(jù)變化會執(zhí)行 魄衅,注冊了兩個zk listener-
JobCrashedJobListener
- 監(jiān)聽節(jié)點刪除事件,檢查 sharding 下的子節(jié)點 instance 塘辅,如果instaceid和掛掉的機器instanceid是一樣的晃虫,這樣就搜集了掛掉實例上運行的所有分片,如果 /sharding/xx/failover 節(jié)點沒有創(chuàng)建扣墩,那么就創(chuàng)建 /leader/failover/items/xx 節(jié)點哲银。
然后創(chuàng)建臨時節(jié)點 sharding/xx/failover
同時刪除 /leader/failover/items/xx
立即啟動作業(yè) scheduler.triggerJob(jobDetail.getKey());
- 監(jiān)聽節(jié)點刪除事件,檢查 sharding 下的子節(jié)點 instance 塘辅,如果instaceid和掛掉的機器instanceid是一樣的晃虫,這樣就搜集了掛掉實例上運行的所有分片,如果 /sharding/xx/failover 節(jié)點沒有創(chuàng)建扣墩,那么就創(chuàng)建 /leader/failover/items/xx 節(jié)點哲银。
FailoverSettingsChangedJobListener
監(jiān)聽config配置節(jié)點,如果配置改變了(通過控制臺改變)呻惕,如果配置不再支持failover了荆责,那么就刪掉所有的 sharding/xx/failover 節(jié)點
-
- monitorExecutionListenerManager
- shutdownListenerManager
- triggerListenerManager
- rescheduleListenerManager
- guaranteeListenerManager
- regCenterConnectionStateListener
- 任務狀態(tài)
- jobStatusTraceEvent
TASK_STAGING
TASK_RUNNING,
TASK_FINISHED,
TASK_KILLED,
TASK_LOST,
TASK_FAILED,
TASK_ERROR,
TASK_DROPPED,
TASK_GONE,
TASK_GONE_BY_OPERATOR,
TASK_UNREACHABLE,
TASK_UNKNOWN
INSERT INTO `JOB_STATUS_TRACE_LOG` (`id`, `job_name`, `original_task_id`, `task_id`, `slave_id`, `source`, `execution_type`, `sharding_item`, `state`, `message`, `creation_time`) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
JobStatusTraceEvent
JobStatusTraceEvent
- 線程池
new ThreadPoolExecutor(
threadSize,
threadSize,
5L,
TimeUnit.MINUTES,
workQueue,
new BasicThreadFactory.Builder().namingPattern(Joiner.on("-").join(namingPattern, "%s")).build());
threadPoolExecutor.allowCoreThreadTimeOut(true);
1. 固定大小線程池
2. 允許core線程超時
3. 線程數(shù)量 = Runtime.getRuntime().availableProcessors() * 2
-
如果我們自定義的業(yè)務邏輯報錯會怎么處理
在框架里會catch住,將錯誤堆棧信息提交到eventbus亚脆,并記錄到日志做院。
提供的擴展點
- 默認的線程池是固定大小、線程數(shù)量是cpu個數(shù)的2倍型酥,對于cpu密集和IO密集不能做區(qū)分山憨,因此提供了線程池的擴展點
app啟動
- 從zk中獲取到config
- xx
- 通過jobProperties 可以設置任務執(zhí)行的線程池和異常處理策略