tbschedule源碼解讀
tbschedule部署包括兩部分是越,一個是負責(zé)配置管理的后臺程序,一個是客戶端接入包倚评,這兩個程序依賴zk進行信息交互。
zk數(shù)據(jù)的大致結(jié)構(gòu)
factory部分:
/app1/factory
/app1/factory/facotoryUUID1
/app1/strategy
/app1/strategy/strategy1
/app1/strategy/strategy1/factoryUUID1
可以有多個facotory盔性,每個factory對應(yīng)一個客戶端啟動的TBScheduleManagerFactory
實例呢岗,每個JVM可以有多個factory實例,factory實例也可以存在于不同的JVM中悉尾。
strategy是在后臺配置的任務(wù)策略挫酿,每個factory啟動時候回去檢查自己能處理哪幾個strategy,如果能處理則在/app1/strategy/strategy1/
路徑下注冊自己早龟,注冊的這個信息在tbschedule源碼里叫做FactoryRunningInfo
。
ScheduleServer部分:
/app1/baseTaskType
/app1/baseTaskType/task1
/app1/baseTaskType/task1/task1
/app1/baseTaskType/task1/task1/server
/app1/baseTaskType/task1/task1/server/scheduleServerUUID1
/app1/baseTaskType/task1/task1/taskItem
/app1/baseTaskType/task1/task1/taskItem/taskItem1
/app1/baseTaskType/task1/task1/taskItem/taskItem1/cur_server
/app1/baseTaskType/task1/task1/taskItem/taskItem1/deal_desc
/app1/baseTaskType/task1/task1/taskItem/taskItem1/parameter
/app1/baseTaskType/task1/task1/taskItem/taskItem1/req_server
/app1/baseTaskType/task1/task1/taskItem/taskItem1/sts
task1是在后臺配置的任務(wù)壹店。
/app1/baseTaskType/task1/task1/server/scheduleServerUUID1
表示可以用來處理任務(wù)的調(diào)度器芝加,每個factory實例可以有多個ScheduleServer實例。
/app1/baseTaskType/task1/task1/taskItem
表示配置任務(wù)時老赤,每個任務(wù)可以拆分成幾個小的任務(wù)項制市。該節(jié)點的子節(jié)點,表示這個任務(wù)項運行時的信息开财,例如cur_server
表示這個taskItem正在被哪個ScheduleServer處理误褪。這些在tbschedule源碼里也叫作runningInfo。
核心類圖
TBScheduleManagerFactory
factory實例對象历葛,管理這個factory內(nèi)部所有的事情嘀略。ZKManager
負責(zé)與zk之間的連接,數(shù)據(jù)交換帜羊。IScheduleDataManager
負責(zé)/app1/baseTaskType
及其子節(jié)點所有數(shù)據(jù)模型維護讼育。ScheduleDataManger4ZK
負責(zé)/app1/factory``/app1/strategy
及其字節(jié)點數(shù)據(jù)模型維護。IStrategyTask
每個實例代表一個線程組奶段,每個strategy可對應(yīng)多個IStrategyTask
實例,來真正處理配置的任務(wù)扛伍。
關(guān)于這幾個類的組合關(guān)系如下圖:
一個Factory處理多個strategy词裤,每個strategy下有多個
IStrategyTask
對象。TBScheduleManager
實現(xiàn)IStrategyTask
接口逆航,一個TBScheduleManager
實例跟ScheduleServer
渔肩、ScheduleProcessor
、IScheduleTaskDeal
的關(guān)系都是一比一的關(guān)系。ScheduleServer
是針對某一個task的的調(diào)度器撑帖。IScheduleTaskDeal
是我們自己代碼里需要實現(xiàn)的任務(wù)對象澳眷。ScheduleProcessor
是處理任務(wù)的多線程任務(wù)處理器,代表一個線程組衷敌⊥氐桑可以包含多個線程,線程的最大數(shù)量取決于后臺配置的task身上的threadNum
字段面氓。一個Factory有多個
IStrategyTask
的原因是沟堡,任務(wù)需要分片處理,每個分片對應(yīng)一個IStrategyTask
實例航罗。一個
ScheduleProcessor
有多個Thread的原因是,一個任務(wù)分片下一次可以取出多個任務(wù)柏锄,開啟多線程可以并發(fā)處理這些任務(wù)复亏。
初始化流程
整個初始化過程大量使用Thread缔御、Timer,很多工作都是異步進行的耕突,且這些線程之間通過了狀態(tài)對象、鎖等方式進行了協(xié)調(diào)炕泳。
整個初始化過程粗略來看包括以下幾步:
- 創(chuàng)建ZKManager對象
- 啟動初始化線程
InitalThread
上祈,然后立即返回
接著便是InitalThread
異步做的初始化工作: - 準(zhǔn)備好ZKManager浙芙、ScheduleDataManager4ZK籽腕、ScheduleStrategyDataManager4ZK對象
- 啟動定時Timer對象
ManagerFactoryTimerTask
接著便是ManagerFactoryTimerTask
定時執(zhí)行的工作,主要是去掃描strategy配置晤锥,重新分配factory去處理這些strategy廊宪。分配完factory女轿,會創(chuàng)建StrategyTask進行任務(wù)的處理。
factory刷新工作詳解
整個過程源碼入口在ManagerFactoryTimerTask#run()
中傅寡,而主要的邏輯集中在TBScheduleManagerFactory#refresh()
北救。這里不去關(guān)心stop factory的逆向流程,只來看正向流程托启,見TBScheduleManagerFactory #reRegisterManagerFactory
攘宙。
- 遍歷strategy,重新計算factory實例跟strategy的匹配關(guān)系
- 找到當(dāng)前factory實例不能處理的strategy蹭劈,并停止掉正在運行的
StrategyTask
- 遍歷跟當(dāng)前factory實例相關(guān)的strategy铺韧,選舉出每個strategy的leader factory實例,由leader重新計算每個factory實例能夠分到的reqNum哈打,即根據(jù)strategy身上的
assignNum``numOfSingleServer
,將assignNum
平分給每個factory實例患雏。 - 調(diào)整當(dāng)前factory實例分配到每個strategy的的StrategyTask的數(shù)量罢维,確保數(shù)量等于上一步分配給自己的數(shù)量丙挽。
factory線程組數(shù)量分配算法
見ScheduleUtil#assignTaskNumber
/**
* 分配任務(wù)數(shù)量
* @param serverNum 總的服務(wù)器數(shù)量
* @param taskItemNum 任務(wù)項數(shù)量
* @param maxNumOfOneServer 每個server最大任務(wù)項數(shù)目
* @return
*/
public static int[] assignTaskNumber(int serverNum,int taskItemNum,int maxNumOfOneServer){
int[] taskNums = new int[serverNum];
int numOfSingle = taskItemNum / serverNum;
int otherNum = taskItemNum % serverNum;
//20150323 刪除, 任務(wù)分片保證分配到所有的線程組數(shù)上颜阐。 開始
// if (maxNumOfOneServer >0 && numOfSingle >= maxNumOfOneServer) {
// numOfSingle = maxNumOfOneServer;
// otherNum = 0;
// }
//20150323 刪除, 任務(wù)分片保證分配到所有的線程組數(shù)上吓肋。 結(jié)束
for (int i = 0; i < taskNums.length; i++) {
if (i < otherNum) {
taskNums[i] = numOfSingle + 1;
} else {
taskNums[i] = numOfSingle;
}
}
return taskNums;
}
TBScheduleManagerStatic的初始化流程
- 找到task配置的用戶實現(xiàn)的
IScheduleTaskDeal
對象 - 將當(dāng)前ScheduleServer實例注冊到
/app1/baseTaskType/task1/task1/server/scheduleServerUUID1
位置 - 啟動心跳Timer
HearBeatTimerTask
- 啟動初始化線程
心跳Timer
這里主要做的事情就是重新將taskItem分配到每個SchedueServer是鬼,源碼位置在TBScheduleManagerStatic#assignScheduleTask()
。首先選舉出當(dāng)前ScheduleServer對應(yīng)的task對應(yīng)的所有ScheduleServer實例均蜜,選舉出一個leader囤耳,由leader進行分配工作。
- 等到初始化線程完成initialRunningInfo的工作
- clearTaskItem充择,遍歷所有taskItem,查看對應(yīng)的cur_server是否還能找到宰僧,找不到則將cur_server置為null
- assignTaskItem铃剔,給每個taskItem分配合適的ScheduleServer實例。
初始化線程
- initialRunningInfo凤类,由當(dāng)前task的leader ScheduleServer實例初始化這個task下所有的taskItem子節(jié)點的數(shù)據(jù)普气,此時還沒有分配每個taskItem由哪個ScheduleServer實例執(zhí)行(見心跳Timer)
- getCurrentScheduleTaskItemListNow,重新加載當(dāng)前ScheduleServer能處理的taskItem項目
- computerStart现诀,創(chuàng)建兩個Timer,一個用來計算任務(wù)下次執(zhí)行開始時間坐桩,一個用來計算任務(wù)下次終止執(zhí)行時間封锉。停止跟恢復(fù)通過
TBScheduleManager
身上的isPauseSchedule
字段來標(biāo)識膘螟。 - 恢復(fù)的時候去創(chuàng)建
TBScheduleProcessorSleep``TBScheduleProcessorNotSleep
對象碾局;停止的時候净当,會將已經(jīng)在執(zhí)行的任務(wù)處理完余爆,但是緩存在隊列中待執(zhí)行的任務(wù)將被丟棄尖阔。
TBScheduleProcessorSleep多線程工作原理
啟動task配置的threadNum數(shù)量的線程去處理任務(wù)庶诡。由其中某一個線程去獲取任務(wù)全陨,將入taskList隊列中甚颂,所有的線程從這個隊列中獲取任務(wù)執(zhí)行秀菱,如果是Multi任務(wù)衍菱,可以一次取多個任務(wù)執(zhí)行。在一個線程獲取任務(wù)的過程中脊串,其他線程處于休眠狀態(tài),任務(wù)獲取完畢喚醒其他線程放闺。獲取任務(wù)代碼在TBScheduleProcessorSleep#loadScheduleData
缕坎,每次獲取都是調(diào)用一次IScheduleTaskDeal
對象selectTasks
方法獲取一批任務(wù)放到taskList中。
兩次loadScheduleData
有一個休眠時間匾寝,即在task上配置的SleepTimeInterval荷腊。
一旦TBScheduleProcessorSleep啟動了,會一直循環(huán)執(zhí)行女仰,知道PauseTimer讓其停止,如果你沒有配置結(jié)束時間码倦,則不會停止,而是一直運行勿璃;也可以通過后臺配置將任務(wù)停止推汽。
總結(jié)
tbschdule通過任務(wù)分片,將一個任務(wù)分配給多個線程組(即ScheduleServer實例)執(zhí)行莲组,這些線程組可以分布在相同或者不同的JVM上暖夭。而每個線程組支持多線程處理某一個分片的任務(wù)。
tbschedule同時支持失效任務(wù)轉(zhuǎn)移功能迈着,并且可以通過管理后臺對任務(wù)進行調(diào)度管理裕菠。
不過官方文檔實在太少。
參考:
tbschedule
關(guān)于TbSchedule任務(wù)調(diào)度管理框架的整合部署