構(gòu)建Topology并在本地測(cè)試后并鸵,我們就可以將工程打包為jar包纽什,并通過jstorm的jar命令提交到集群合溺。這個(gè)過程使用了thrift的遠(yuǎn)程調(diào)用凑队,相關(guān)技術(shù)可以參照http://matt33.com/2016/04/07/thrift-learn/#Thrift%E4%B9%8BHello-World
1.提交命令
將工程打?yàn)閖ar包之后,可以通過下面的命令提交到j(luò)storm集群中忌锯。在提交之前伪嫁,我們需要先啟動(dòng)zookeeper,nimbus和supervisor偶垮。
jstorm jar target/sequence-split-merge-1.0.5-jar-with-dependencies.jar
com.alipay.dw.jstorm.example.sequence.SequenceTopology sequence_test
2.程序分析
2.1 jstorm腳本
jstorm命令其實(shí)是一個(gè)python腳本张咳,位于jstorm-server項(xiàng)目bin/jstorm.py中。這個(gè)腳本的主要工作是提供了一些子命令似舵,最終會(huì)調(diào)用指定的jar包內(nèi)的main方法脚猾。
在main方法中,會(huì)根據(jù)傳入的參數(shù)查找對(duì)應(yīng)的方法砚哗。例如龙助,jstrom jar
命令就是jar對(duì)應(yīng)的jar方法。在這個(gè)腳本中包含了很多子命令蛛芥,可以看到有如下多個(gè)子命令泌参,用來控制jstorm集群的行為脆淹。
COMMANDS = {"jar": jar, "kill": kill, "nimbus": nimbus, "zktool": zktool,
"drpc": drpc, "supervisor": supervisor, "localconfvalue": print_localconfvalue,
"remoteconfvalue": print_remoteconfvalue, "classpath": print_classpath,
"activate": activate, "deactivate": deactivate, "rebalance": rebalance, "help": print_usage}
jar方法的主要邏輯是拼裝java命令并調(diào)用
def jar(jarfile, klass, *args):
childopts = "-Dstorm.jar=" + jarfile + (" -Dstorm.root.logger=INFO,stdout -Dlog4j.configuration=File:%s/conf/aloha_log4j.properties" %JSTORM_DIR)
exec_storm_class(
klass,
jvmtype="-client -Xms256m -Xmx256m",
extrajars=[jarfile, CONF_DIR, JSTORM_DIR + "/bin", LOG4J_CONF],
args=args,
childopts=childopts)
例如,本文jstorm jar
的命令最終會(huì)執(zhí)行如下java命令沽一,我們需要到我們編寫main方法中的StormSubmitter查看具體邏輯。
java -client -Xms256m -Xmx256m
-Djstorm.home=/Users/shishengjie/software/jstorm-0.9.1
-Dstorm.options=
-Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib
-Dstorm.jar=target/sequence-split-merge-1.0.5-jar-with-dependencies.jar
-Dstorm.root.logger=INFO,stdout
-Dlog4j.configuration=File:/Users/shishengjie/software/jstorm-0.9.1/conf/aloha_log4j.properties
-cp /.../....jar:target/sequence-split-merge-1.0.5-jar-with-dependencies.jar:/Users/shishengjie/.jstorm:/Users/shishengjie/software/jstorm-0.9.1/bin:/Users/shishengjie/software/jstorm-0.9.1/conf/jstorm.log4j.properties
com.alipay.dw.jstorm.example.sequence.SequenceTopology
"sequence_test"
2.2 StormSubmitter
StormSubmitter類定義在jstorm-client工程中漓糙,submitTopology方法用來向運(yùn)行中的jstorm集群提交Topology铣缠。
public static void submitTopology(String name, Map stormConf,
StormTopology topology, SubmitOptions opts)
throws AlreadyAliveException, InvalidTopologyException,
TopologyAssignException {}
提交時(shí)我們可以看到,需要指定拓?fù)涿Q和配置昆禽,StormTopology與SubmitOptions類的實(shí)例蝗蛙。前面幾個(gè)參數(shù)我們都知道是如何設(shè)置的,SubmitOptions是thrift定義的一個(gè)對(duì)象醉鳖,用來表示拓?fù)涫欠窦せ睢?/p>
struct SubmitOptions {
1: required TopologyInitialStatus initial_status;
}
enum TopologyInitialStatus {
ACTIVE = 1,
INACTIVE = 2
}
// java中使用
SubmitOptions submitOptions = new SubmitOptions(TopologyInitialStatus.ACTIVE);
提交拓?fù)涞闹饕壿嫞?/p>
- 參數(shù)的處理捡硅,開發(fā)者設(shè)置的stormConf并不完整,因此需要進(jìn)行處理盗棵。首先會(huì)校驗(yàn)stormConf格式是否正確壮韭,stormConf需要是一個(gè)Json序列化格式,然后從命令行讀取
storm.options
屬性的內(nèi)容纹因,覆蓋stormConf喷屋;接下來分別讀取defaults.yaml
和從storm.conf.file
指定的storm配置文件中讀取配置(如果沒有指定,則從classpath中查找storm.yaml
文件瞭恰,jstorm-server中有一個(gè)名為storm.yaml的文件屯曹,里面是默認(rèn)配置)。讀取完文件的配置后惊畏,再次讀取命令行指定的配置恶耽,保存為conf,并將stormConf的內(nèi)容覆蓋conf颜启,這樣就完成了補(bǔ)充配置的工作偷俭,stormConf是用戶指定配置的,conf是補(bǔ)充后的完整配置农曲。最后社搅,設(shè)置用戶分組參數(shù),將stormConf序列化為json字符串乳规。
2.使用NimbusClient的靜態(tài)方法從conf中獲取NimbusClient實(shí)例
3.調(diào)用NimbusClient的getClusterInfo方法向服務(wù)器獲取集群信息形葬,校驗(yàn)指定的拓?fù)涿Q是否已經(jīng)存在。
4.使用NimbusClient將storm.jar
指定的jar包上傳到服務(wù)器暮的,storm.jar
是在jstorm.py中設(shè)置的笙以。
5.使用NimbusClient的submitTopology方法提交拓?fù)洹?/li>
至此,jstorm完成了jar包和拓?fù)涞奶峤欢潮纭N覀兛梢钥吹饺渴峭ㄟ^NimbusClient來完成的猖腕。有client端就有server端拆祈,這就涉及到了jsorm的架構(gòu)。storm命令的python腳本都是作為client端發(fā)送請(qǐng)求的倘感,而使用jstorm nimbus
啟動(dòng)的進(jìn)程則是服務(wù)端放坏,用來處理client請(qǐng)求。
2.3 Nimbus
jstorm的nimbus命令會(huì)啟動(dòng)一個(gè)服務(wù)器進(jìn)程老玛,其提供了很多thrift服務(wù)淤年,用來管理集群中拓?fù)洌蝿?wù)蜡豹,worker等狀態(tài)麸粮。Nimbus也定義在strom.thrift中,聲明了其提供的很多服務(wù)镜廉。NimbusClient是jstorm封裝的客戶端弄诲,用來向服務(wù)器發(fā)出請(qǐng)求。
下面是使用thrift定義的nimbus提供的服務(wù)娇唯。
service Nimbus {
// 拓?fù)湎嚓P(guān)
void submitTopology(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite, 3: TopologyAssignException tae);
void submitTopologyWithOpts(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology, 5: SubmitOptions options) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite, 3:TopologyAssignException tae);
void killTopology(1: string name) throws (1: NotAliveException e);
void killTopologyWithOpts(1: string name, 2: KillOptions options) throws (1: NotAliveException e);
void activate(1: string name) throws (1: NotAliveException e);
void deactivate(1: string name) throws (1: NotAliveException e);
void rebalance(1: string name, 2: RebalanceOptions options) throws (1: NotAliveException e, 2: InvalidTopologyException ite);
// 上傳文件
string beginFileUpload();
void uploadChunk(1: string location, 2: binary chunk);
void finishFileUpload(1: string location);
// 下載文件
string beginFileDownload(1: string file);
//can stop downloading chunks when receive 0-length byte array back
binary downloadChunk(1: string id);
// 獲取集群狀態(tài)和拓?fù)錉顟B(tài)
string getNimbusConf();
ClusterSummary getClusterInfo();
TopologyInfo getTopologyInfo(1: string id) throws (1: NotAliveException e);
SupervisorWorkers getSupervisorWorkers(1: string host) throws (1: NotAliveException e);
string getTopologyConf(1: string id) throws (1: NotAliveException e);
StormTopology getTopology(1: string id) throws (1: NotAliveException e);
StormTopology getUserTopology(1: string id) throws (1: NotAliveException e);
}
NimbusClient的獲取
NimbusClient繼承了ThriftClient類齐遵,創(chuàng)建時(shí)傳入了conf
作為配置。主要的初始化方法在ThriftClient的構(gòu)造方法中视乐。
- 獲取master地址:初始化時(shí)洛搀,首先會(huì)根據(jù)conf獲取zookeeper的配置,然后創(chuàng)建Curator客戶端佑淀,由于nimbus在啟動(dòng)后創(chuàng)建
/jstorm/master
節(jié)點(diǎn)并將master的host和ip寫入節(jié)點(diǎn)留美,nimbus的client啟動(dòng)時(shí)會(huì)對(duì)該節(jié)點(diǎn)進(jìn)行檢查,之后會(huì)監(jiān)聽這個(gè)節(jié)點(diǎn)伸刃,發(fā)送變化時(shí)會(huì)flushClient谎砾。 - 創(chuàng)建客戶端:主要的邏輯在flushClient中,會(huì)從節(jié)點(diǎn)中讀取host和port捧颅,然后使用thrift的接口創(chuàng)建連接景图。
- 創(chuàng)建完成后,NimbusClient通過flush方法獲取了與服務(wù)端通信的
Nimbus.Client
碉哑。
2.4 NimbusServer
由于客戶端所做的事情只是調(diào)用挚币,具體的邏輯是在服務(wù)端實(shí)現(xiàn)的,因此有必要對(duì)服務(wù)端進(jìn)行分析扣典。我們?cè)谑褂?code>jstorm nimbus命令時(shí)妆毕,最終會(huì)調(diào)用com.alibaba.jstorm.daemon.nimbus.NimbusServer
,里面有main方法贮尖,可以啟動(dòng)一個(gè)服務(wù)器進(jìn)程笛粘。
public static void main(String[] args) throws Exception {
// read configuration files
@SuppressWarnings("rawtypes")
Map config = Utils.readStormConfig();
NimbusServer instance = new NimbusServer();
INimbus iNimbus = new DefaultInimbus();
instance.launchServer(config, iNimbus);
}
服務(wù)器的相關(guān)設(shè)計(jì)我們?cè)谙乱黄臋n中介紹,現(xiàn)在主要是對(duì)服務(wù)器處理提交拓?fù)淙蝿?wù)的處理進(jìn)行分析。按照thrift的定義薪前,我們?cè)?code>initThrift中找到了處理遠(yuǎn)程調(diào)用的類是ServiceHandler
润努。
在ServiceHandler
的submitTopologyWithOpts方法中,提交topology示括。
ServiceHandler的主要邏輯包括:生成標(biāo)準(zhǔn)化的Topology實(shí)例并注冊(cè)到zk上铺浇;將Topology需要的jar包、Topology實(shí)例和配置通過復(fù)制和序列化保存到本地路徑上垛膝;創(chuàng)建Task并注冊(cè)到zk上随抠,task節(jié)點(diǎn)的內(nèi)容為TaskInfo即任務(wù)對(duì)應(yīng)的組件名稱;包裝一個(gè)TopologyEvent對(duì)象提交給TopologyAssign線程處理繁涂,由其完成任務(wù)的資源分配。
- checkTopologyActive 檢查topologyName是否存在二驰,NimbusData中包含一個(gè)用于處理zookeeper的類StormZkClusterState扔罪,首先從
/jstorm/topology
里面獲取激活狀態(tài)的拓?fù)淞斜恚闅v這個(gè)列表桶雀,如果節(jié)點(diǎn)名稱包含topologyName矿酵,則獲取節(jié)點(diǎn)內(nèi)部數(shù)據(jù),將其反序列化為StormBase類矗积,如果name一致全肮,說明已經(jīng)存在。 - NimbusData中保存了提交的總數(shù)量棘捣,對(duì)其加1辜腺;之后,構(gòu)造topologyId乍恐,為
topologyname-次序-時(shí)間戳
评疗。將NimbusData中jstorm的conf與提交的stormConf合并,由于配置被修改了茵烈,對(duì)提交的StormTopology對(duì)象重新構(gòu)造為一個(gè)標(biāo)準(zhǔn)化后的StormTopology百匆。 - setupStormCode對(duì)Nimbus所在服務(wù)器創(chuàng)建路徑
${storm.local.dir}/nimbus/stormdist/{topologyId}
,將上傳的jar包復(fù)制過來呜投,文件名為stormjar.jar加匈;將標(biāo)準(zhǔn)化后的StormTopology序列化到{storm.local.dir}/nimbus/{topologyId}/stormcode.ser
文件中,將標(biāo)準(zhǔn)化后的StormTopology序列化到{storm.local.dir}/nimbus/{topologyId}/stormconf.ser
文件中仑荐。 - setupZkTaskInfo針對(duì)bolt和spout組件生成TaskInfo雕拼,首先在zk上創(chuàng)建
/jstorm/taskbeats/{topoologyId}
,讀取stormconf.ser文件释漆,反序列化為stormConf悲没;讀取stormcode.ser文件,反序列化為StormTopology實(shí)例;然后為topology添加Acker示姿,Acker實(shí)現(xiàn)了IBolt甜橱,用來跟蹤消息處理情況并通知spout;再為topology中的所有組件添加一個(gè)名為__system
的stream栈戳。最后遍歷topology中的組件岂傲,根據(jù)設(shè)置的并發(fā)度,保存到taskToComponetId
中子檀,例如镊掖,spout并發(fā)度為2,那么會(huì)保存1->spout 2->spout
褂痰。有了這個(gè)TreeMap亩进,遍歷并創(chuàng)建TaskInfo,在zk上創(chuàng)建/jstorm/tasks/{topoologyId}/{taskId}
,將TaskInfo序列化保存到節(jié)點(diǎn)里面缩歪。例如归薛,spout會(huì)在創(chuàng)建兩個(gè)節(jié)點(diǎn),節(jié)點(diǎn)內(nèi)容為TaskInfo匪蝙。 - 為topology分配任務(wù)主籍,TopologyAssign是一個(gè)線程,會(huì)不斷地從queue中獲取TopologyAssignEvent對(duì)象并進(jìn)行任務(wù)分配逛球,所以在提交提交任務(wù)的時(shí)候千元,只需要?jiǎng)?chuàng)建TopologyAssignEvent對(duì)象并push到queue中。而實(shí)際的任務(wù)分配則是在TopologyAssign中完成颤绕。在NimbusServer啟動(dòng)時(shí)的initTopologyAssign方法中幸海,會(huì)啟動(dòng)
TopologyAssign
線程。
2.4 TopologyAssign
TopologyAssign線程的主要工作就是處理Topology的任務(wù)分配屋厘,主要邏輯在doTopologyAssignment
方法中涕烧。流程是先根據(jù)TopologyAssignEvent生成Assignment,然后將Assignment備份到zk中汗洒。在分析之前议纯,需要先對(duì)下面幾個(gè)對(duì)象有所了解
TopologyAssignContext
在處理分配拓?fù)渲埃覀円延械馁Y源是Topology實(shí)例和配置信息溢谤。TopologyAssignContext的目的是將分配拓?fù)渌璧臄?shù)據(jù)都維護(hù)起來瞻凤。TopologyAssignContext內(nèi)部保存了拓?fù)浞峙涞南嚓P(guān)上下文:
- assignType - 分配的類型,包括ASSIGN_TYPE_NEW-新建世杀,ASSIGN_TYPE_REBALANCE-重新平衡阀参,ASSIGN_TYPE_MONITOR-監(jiān)聽。
- rawTopology - 未被修改前的StormTopology實(shí)例瞻坝;
- stormConf - 拓?fù)涞呐渲茫?/li>
- oldAssignment - 拓?fù)湟呀?jīng)存在的Assignment實(shí)例蛛壳;
- SupervisorInfo - 當(dāng)前supervisor運(yùn)行信息;
- taskToComponent - task與組件名稱的映射;
- allTaskIds - 所有的task衙荐;
- deadTaskIds - 已經(jīng)不運(yùn)行的task捞挥;
- unstoppedTaskIds - 雖然活著但所在supervisor停止運(yùn)行的task
- cluster - storm集群中的SupervisorInfo
DefaultTopologyAssignContext
DefaultTopologyAssignContext繼承了TopologyAssignContext類,
- sysTopology - Storm會(huì)對(duì)rawTopology添加ack等機(jī)制忧吟,生成新的Topology實(shí)例
- sidToHostname - supervisorid -> hostname的映射
- hostToSid - hostname -> supervisorid的映射
- oldWorkerTasks - zk中記錄了Topology舊的Assignment砌函,內(nèi)部包含task的列表
- componentTasks - 組件名稱 -> taskid的映射
- unstoppedAssignments - 已經(jīng)停止的supervisor中的task
- totalWorkerNum - worker總數(shù),默認(rèn)大小為min({topologt.workers}溜族,所有組件的并發(fā)數(shù)之和)
- unstoppedWorkerNum - 已經(jīng)停止的supervisor中的task數(shù)量
- DISK_WEIGHT - 磁盤的權(quán)重
- CPU_WEIGHT - CPU的權(quán)重
- MEM_WEIGHT - 內(nèi)存的權(quán)重
- PORT_WEIGHT - 端口的權(quán)重
- TASK_ON_DIFFERENT_NODE_WEIGHT - task運(yùn)行在不同節(jié)點(diǎn)上的權(quán)重
- USE_OLD_ASSIGN_RATIO_WEIGHT - 使用舊的分配率的權(quán)重
- USER_DEFINE_ASSIGN_RATIO_WEIGHT - 使用用戶定義的分配率的權(quán)重
- DEFAULT_WEIGHT - 默認(rèn)的權(quán)重
DefaultTopologyScheduler
DefaultTopologyScheduler實(shí)現(xiàn)了IToplogyScheduler接口讹俊,IToplogyScheduler只有一個(gè)方法
Map<Integer, ResourceAssignment> assignTasks(TopologyAssignContext contex)
throws FailedAssignTopologyException;
也就是說DefaultTopologyScheduler的主要功能就是根據(jù)TopologyAssignContext為task分配資源,如指定所在的supervisor煌抒,并在Supervisor的資源池中為task分配所需的資源仍劈。assignTasks
方法是處理拓?fù)涞暮诵墓δ堋?/p>
ResourceAssignment
每個(gè)task會(huì)被分配給一個(gè)ResourceAssignment實(shí)例,ResourceAssignment包含了task所在supervisorId和磁盤寡壮,cpu耳奕,內(nèi)存這些資源所在的slot,以及端口port诬像。
- supervisorId - supervisor的id
- hostname - 所在的機(jī)器
- diskSlot - 磁盤的slot
- cpuSlotNum - cpu的slot數(shù)量
- memSlotNum - 內(nèi)存的slot數(shù)量
- port - 端口
Assignment
每個(gè)Topology都需要被nimbus分配給supervisor執(zhí)行,分配的元數(shù)據(jù)就保存在Assignment對(duì)象中闸婴。通過zk的節(jié)點(diǎn)/jstorm/assignments/{topologyid}
傳遞給Supervisor坏挠,Assignment內(nèi)部包括:
- nodeHost - 保存{supervisorid: hostname} -- 分配的supervisor
- taskStartTimeSecs - 每個(gè)task的啟動(dòng)時(shí)間,{taskid, taskStartSeconds
- masterCodeDir - topology的代碼路徑
- taskToResource - 每個(gè)task對(duì)應(yīng)的ResourceAssignment
處理流程
mkAssignment的prepareTopologyAssign
prepareTopologyAssign的主要作用是收集jstorm集群當(dāng)前的上下文數(shù)據(jù)邪乍,包括:拓?fù)涿Q降狠、標(biāo)準(zhǔn)化后的Topology、Topology配置庇楞、Supervisor列表榜配、task列表、已經(jīng)存在的Assignment吕晌。
- 從本地路徑讀取topology文件蛋褥,然后反序列化為StormTopology,保存到rawTopology中睛驳;再組裝topology的stormConf烙心;
- 然后,從zk上獲取所有Supervisor乏沸,如果沒有Supervisor存在淫茵,會(huì)拋出異常。
- 從zk上獲取task列表蹬跃,保存到TaskToComponent和AllTaskIds中匙瘪;有了拓?fù)涞膖ask列表之后,從zk中獲取拓?fù)涞腁ssignment信息。如果已經(jīng)存在了丹喻,則會(huì)根據(jù)task的心跳和Supervisor的運(yùn)行狀態(tài)薄货,區(qū)分出還在運(yùn)行中的和已經(jīng)停止運(yùn)行的task,分別保存到aliveTasks和unstoppedTasks中驻啤。具體邏輯是:從
/jstorm/taskbeats/{topoologyId}
節(jié)點(diǎn)下查找拓?fù)渲邪膖ask記錄到allTaskIds中菲驴;通過保持心跳的task記錄,可以將aliveTasks從allTaskIds過濾出來骑冗;由于Supervisor可能已經(jīng)停止運(yùn)行赊瞬,所以還需要找出來其包含的task,具體是通過zk中舊的Assignment中維護(hù)了task和其對(duì)應(yīng)的Supervisor(getTaskToResource的ResourceAssignment中)贼涩,如果Supervisor已經(jīng)不在運(yùn)行了巧涧,就將這些task保存到unstoppedTasks中。有了這些信息遥倦,deadTasks就是allTaskId減去aliveTasks的集合谤绳。 - 如果zk上沒有拓?fù)鋵?duì)應(yīng)的舊Assignment信息,就不需要找出deadTasks和unstoppedTasks袒哥,這兩個(gè)集合都為空了缩筛。
- 為Supervisor收集未使用的slot,zk上記錄了所有的Supervisor和Assignment堡称,而Assignment中記錄了使用的資源瞎抛,這樣就可以分配已使用的資源,剩余的就是未使用的資源却紧。首先遍歷zk上注冊(cè)的所有的Assignment桐臊,由于Assignment中包含了task與ResourceAssignment的映射,而ResourceAssignment中包含了task所在的Supervisor的id晓殊,因此可以獲取到每個(gè)task的SupervisorInfo断凶,SupervisorInfo中維護(hù)了Supervisor的資源池(cpu,內(nèi)存巫俺,磁盤和網(wǎng)絡(luò))认烁,在池中分配task所需的資源。
- 設(shè)置拓?fù)湟延械腁ssignment介汹,保存到OldAssignment中砚著,如果拓?fù)涞腁ssignment不存在,會(huì)從zk中獲取AssignmentBak痴昧。在這個(gè)過程中稽穆,也會(huì)設(shè)置AssignType的類型。
mkAssignment的IToplogyScheduler
收集完TopologyContext之后赶撰,就可以使用調(diào)度器來為任務(wù)分配資源了舌镶。IToplogyScheduler是默認(rèn)的用來調(diào)度拓?fù)涞念愔梗趇nit方法中被創(chuàng)建,使用的是DefaultTopologyScheduler類餐胀。這個(gè)類的作用是為task分配指定的資源哟楷。
分配過程為:
- 檢查分配類型是否為ASSIGN_TYPE_NEW、ASSIGN_TYPE_REBALANCE否灾、ASSIGN_TYPE_MONITOR三者之一卖擅;
- 在TopologyAssignContext的基礎(chǔ)上創(chuàng)建DefaultTopologyAssignContext,內(nèi)部包含了分配task資源需要的weight配置
- 如果是REBALANCE類型的墨技,需要先釋放掉已有的資源惩阶。context中保存了所有的SupervisorInfo,zk中記錄了原有的Assignment扣汪,內(nèi)部包含了task使用資源的情況断楷,這樣就可以對(duì)SupervisorInfo內(nèi)的資源池進(jìn)行釋放。
- 統(tǒng)計(jì)需要分配的task崭别,保存在needAssignTasks中冬筒。ASSIGN_TYPE_NEW類型的時(shí)候所有都需要分配;ASSIGN_TYPE_REBALANCE 需要分配(所有task 減去 已經(jīng)停止的supervisor的task)茅主;ASSIGN_TYPE_MONITOR需要分配所有dead的task舞痰。
- 由于當(dāng)前zk中可能有Assignment,其對(duì)應(yīng)的task需要保持不變诀姚,所以匀奏,將這些task查找出來保存在keepAssigns中,然后獲取需要保持的任務(wù)對(duì)應(yīng)的<supervisorid和port> - > task学搜,即每個(gè)WorkerSlot上對(duì)應(yīng)的task。
- 計(jì)算 需要分配的worker數(shù)量 = 所有的worker總數(shù)量 - 已停止的supervisor的worker數(shù)量 - 需要保持的worker數(shù)量
- registerPreAssignHandler方法計(jì)算需要分配的任務(wù)needAssignTasks中每個(gè)組件分配類型ComponentAssignType對(duì)應(yīng)的task论衍。由于task中記錄了組件名稱瑞佩,每個(gè)組件的配置可能會(huì)不同。ComponentAssignType共有三種類型的分配方式:USER_DEFINE, USE_OLD, NORMAL坯台。每種方式都可能會(huì)對(duì)應(yīng)著我們需要分配的若干task炬丸。
- 有了ComponentAssignType之后,就可以為task分配資源了蜒蕾。使用USER_DEFINE時(shí)稠炬,使用UserDefinePreAssign需要從配置文件中讀取需要分配的cpu,內(nèi)存和磁盤的solt咪啡。使用NORMAL時(shí)首启,對(duì)應(yīng)的NormalPreAssign類。先查找是否有用戶自定義的分配方式撤摸,如果沒有毅桃,從配置中獲取task占用的slot數(shù)量褒纲。
cpu.slots.per.task
:每個(gè)task會(huì)使用的cup slot數(shù)量,memory.slots.per.task
每個(gè)task會(huì)使用的內(nèi)存slot數(shù)量钥飞,task.alloc.disk.slot
每個(gè)task會(huì)使用的磁盤slot數(shù)量莺掠。有了slot數(shù)量,還需要注意task.on.differ.node
是否要求task位于不同的node上读宙。滿足slot的Supervisor很多彻秆,這時(shí)候就會(huì)使用總權(quán)重或level來為task選擇最佳的Supervisor摸屠,并在Supervisor的資源池中分配相應(yīng)的資源脂信,并封裝為ResourceAssignment對(duì)象返回。 - 分配完成后蒂窒,執(zhí)行PostAssignTaskPort的postAssign方法膀估,為task分配worker需要的port幔亥。
完成上述兩個(gè)步驟之后,已經(jīng)為Topology分配了所需的資源察纯,包括Supervisor節(jié)點(diǎn)帕棉,worker,內(nèi)存饼记,磁盤香伴,cpu的數(shù)量。
mkAssignment的后續(xù)操作
- updateGroupResource 如果使用了Group模式具则,就需要設(shè)置分組數(shù)據(jù)即纲。
- 計(jì)算supervisor與host的映射:zk中記錄了所有的supervisor,過濾出task需要的supervisorid與host的映射關(guān)系即可。
- 計(jì)算task的starttime:ASSIGN_TYPE_NEW類型為當(dāng)前時(shí)間
- 按照Assignment的構(gòu)造方法博肋,封裝為Assignment對(duì)象
- 將Assignment的數(shù)據(jù)寫入zk的
/jstorm/assignments/{topologyId}
節(jié)點(diǎn)里面 - 更新task的心跳起始時(shí)間
setTopologyStatus
激活Topology:/jstorm/topology/{topologyId}
讀取并反序列化為StormBase對(duì)象的實(shí)例低斋。如果不存在,就創(chuàng)建這個(gè)節(jié)點(diǎn)匪凡,并創(chuàng)建StormBase實(shí)例保存起來膊畴。如果StormBase存在,就更新狀態(tài)病游。
backupAssignment
主要的操作是將Assignment保存在zk上面唇跨。由于上面的過程中已經(jīng)生產(chǎn)了Assignment對(duì)象,遍歷task并構(gòu)造出組件與task的映射關(guān)系后衬衬,封裝為AssignmentBak买猖,保存到/jstorm/assignments_bak/{topologyName}
節(jié)點(diǎn)的內(nèi)容中。