JStorm源碼分析-3.提交Topology

構(gòu)建Topology并在本地測(cè)試后并鸵,我們就可以將工程打包為jar包纽什,并通過jstorm的jar命令提交到集群合溺。這個(gè)過程使用了thrift的遠(yuǎn)程調(diào)用凑队,相關(guān)技術(shù)可以參照http://matt33.com/2016/04/07/thrift-learn/#Thrift%E4%B9%8BHello-World

1.提交命令

將工程打?yàn)閖ar包之后,可以通過下面的命令提交到j(luò)storm集群中忌锯。在提交之前伪嫁,我們需要先啟動(dòng)zookeeper,nimbus和supervisor偶垮。

jstorm jar target/sequence-split-merge-1.0.5-jar-with-dependencies.jar 
com.alipay.dw.jstorm.example.sequence.SequenceTopology sequence_test

2.程序分析

2.1 jstorm腳本

jstorm命令其實(shí)是一個(gè)python腳本张咳,位于jstorm-server項(xiàng)目bin/jstorm.py中。這個(gè)腳本的主要工作是提供了一些子命令似舵,最終會(huì)調(diào)用指定的jar包內(nèi)的main方法脚猾。
在main方法中,會(huì)根據(jù)傳入的參數(shù)查找對(duì)應(yīng)的方法砚哗。例如龙助,jstrom jar命令就是jar對(duì)應(yīng)的jar方法。在這個(gè)腳本中包含了很多子命令蛛芥,可以看到有如下多個(gè)子命令泌参,用來控制jstorm集群的行為脆淹。

COMMANDS = {"jar": jar, "kill": kill, "nimbus": nimbus, "zktool": zktool,
            "drpc": drpc, "supervisor": supervisor, "localconfvalue": print_localconfvalue,
            "remoteconfvalue": print_remoteconfvalue, "classpath": print_classpath,
            "activate": activate, "deactivate": deactivate, "rebalance": rebalance, "help": print_usage}

jar方法的主要邏輯是拼裝java命令并調(diào)用

def jar(jarfile, klass, *args):
    childopts = "-Dstorm.jar=" + jarfile + (" -Dstorm.root.logger=INFO,stdout -Dlog4j.configuration=File:%s/conf/aloha_log4j.properties"  %JSTORM_DIR)
    exec_storm_class(
        klass,
        jvmtype="-client -Xms256m -Xmx256m",
        extrajars=[jarfile, CONF_DIR, JSTORM_DIR + "/bin", LOG4J_CONF],
        args=args,
        childopts=childopts)

例如,本文jstorm jar的命令最終會(huì)執(zhí)行如下java命令沽一,我們需要到我們編寫main方法中的StormSubmitter查看具體邏輯。

java -client -Xms256m -Xmx256m 
-Djstorm.home=/Users/shishengjie/software/jstorm-0.9.1 
-Dstorm.options= 
-Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib 
-Dstorm.jar=target/sequence-split-merge-1.0.5-jar-with-dependencies.jar 
-Dstorm.root.logger=INFO,stdout 
-Dlog4j.configuration=File:/Users/shishengjie/software/jstorm-0.9.1/conf/aloha_log4j.properties 
-cp /.../....jar:target/sequence-split-merge-1.0.5-jar-with-dependencies.jar:/Users/shishengjie/.jstorm:/Users/shishengjie/software/jstorm-0.9.1/bin:/Users/shishengjie/software/jstorm-0.9.1/conf/jstorm.log4j.properties 
com.alipay.dw.jstorm.example.sequence.SequenceTopology 
"sequence_test"

2.2 StormSubmitter

StormSubmitter類定義在jstorm-client工程中漓糙,submitTopology方法用來向運(yùn)行中的jstorm集群提交Topology铣缠。

public static void submitTopology(String name, Map stormConf,
            StormTopology topology, SubmitOptions opts)
            throws AlreadyAliveException, InvalidTopologyException,
            TopologyAssignException {}

提交時(shí)我們可以看到,需要指定拓?fù)涿Q和配置昆禽,StormTopology與SubmitOptions類的實(shí)例蝗蛙。前面幾個(gè)參數(shù)我們都知道是如何設(shè)置的,SubmitOptions是thrift定義的一個(gè)對(duì)象醉鳖,用來表示拓?fù)涫欠窦せ睢?/p>

struct SubmitOptions {
  1: required TopologyInitialStatus initial_status;
}
enum TopologyInitialStatus {
    ACTIVE = 1,
    INACTIVE = 2
}

// java中使用
SubmitOptions submitOptions = new SubmitOptions(TopologyInitialStatus.ACTIVE);

提交拓?fù)涞闹饕壿嫞?/p>

  1. 參數(shù)的處理捡硅,開發(fā)者設(shè)置的stormConf并不完整,因此需要進(jìn)行處理盗棵。首先會(huì)校驗(yàn)stormConf格式是否正確壮韭,stormConf需要是一個(gè)Json序列化格式,然后從命令行讀取storm.options屬性的內(nèi)容纹因,覆蓋stormConf喷屋;接下來分別讀取defaults.yaml和從storm.conf.file指定的storm配置文件中讀取配置(如果沒有指定,則從classpath中查找storm.yaml文件瞭恰,jstorm-server中有一個(gè)名為storm.yaml的文件屯曹,里面是默認(rèn)配置)。讀取完文件的配置后惊畏,再次讀取命令行指定的配置恶耽,保存為conf,并將stormConf的內(nèi)容覆蓋conf颜启,這樣就完成了補(bǔ)充配置的工作偷俭,stormConf是用戶指定配置的,conf是補(bǔ)充后的完整配置农曲。最后社搅,設(shè)置用戶分組參數(shù),將stormConf序列化為json字符串乳规。
    2.使用NimbusClient的靜態(tài)方法從conf中獲取NimbusClient實(shí)例
    3.調(diào)用NimbusClient的getClusterInfo方法向服務(wù)器獲取集群信息形葬,校驗(yàn)指定的拓?fù)涿Q是否已經(jīng)存在。
    4.使用NimbusClient將storm.jar指定的jar包上傳到服務(wù)器暮的,storm.jar是在jstorm.py中設(shè)置的笙以。
    5.使用NimbusClient的submitTopology方法提交拓?fù)洹?/li>

至此,jstorm完成了jar包和拓?fù)涞奶峤欢潮纭N覀兛梢钥吹饺渴峭ㄟ^NimbusClient來完成的猖腕。有client端就有server端拆祈,這就涉及到了jsorm的架構(gòu)。storm命令的python腳本都是作為client端發(fā)送請(qǐng)求的倘感,而使用jstorm nimbus啟動(dòng)的進(jìn)程則是服務(wù)端放坏,用來處理client請(qǐng)求。

2.3 Nimbus

jstorm的nimbus命令會(huì)啟動(dòng)一個(gè)服務(wù)器進(jìn)程老玛,其提供了很多thrift服務(wù)淤年,用來管理集群中拓?fù)洌蝿?wù)蜡豹,worker等狀態(tài)麸粮。Nimbus也定義在strom.thrift中,聲明了其提供的很多服務(wù)镜廉。NimbusClient是jstorm封裝的客戶端弄诲,用來向服務(wù)器發(fā)出請(qǐng)求。
下面是使用thrift定義的nimbus提供的服務(wù)娇唯。

service Nimbus {
// 拓?fù)湎嚓P(guān)
  void submitTopology(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite, 3: TopologyAssignException tae);
  void submitTopologyWithOpts(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology, 5: SubmitOptions options) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite, 3:TopologyAssignException tae);
  void killTopology(1: string name) throws (1: NotAliveException e);
  void killTopologyWithOpts(1: string name, 2: KillOptions options) throws (1: NotAliveException e);
  void activate(1: string name) throws (1: NotAliveException e);
  void deactivate(1: string name) throws (1: NotAliveException e);
  void rebalance(1: string name, 2: RebalanceOptions options) throws (1: NotAliveException e, 2: InvalidTopologyException ite);

  // 上傳文件
  string beginFileUpload();
  void uploadChunk(1: string location, 2: binary chunk);
  void finishFileUpload(1: string location);
  
// 下載文件
  string beginFileDownload(1: string file);
  //can stop downloading chunks when receive 0-length byte array back
  binary downloadChunk(1: string id);

  // 獲取集群狀態(tài)和拓?fù)錉顟B(tài)
  string getNimbusConf();
  ClusterSummary getClusterInfo();
  TopologyInfo getTopologyInfo(1: string id) throws (1: NotAliveException e);
  SupervisorWorkers getSupervisorWorkers(1: string host) throws (1: NotAliveException e);
  string getTopologyConf(1: string id) throws (1: NotAliveException e);
  StormTopology getTopology(1: string id) throws (1: NotAliveException e);
  StormTopology getUserTopology(1: string id) throws (1: NotAliveException e);
}

NimbusClient的獲取

NimbusClient繼承了ThriftClient類齐遵,創(chuàng)建時(shí)傳入了conf作為配置。主要的初始化方法在ThriftClient的構(gòu)造方法中视乐。

  1. 獲取master地址:初始化時(shí)洛搀,首先會(huì)根據(jù)conf獲取zookeeper的配置,然后創(chuàng)建Curator客戶端佑淀,由于nimbus在啟動(dòng)后創(chuàng)建/jstorm/master節(jié)點(diǎn)并將master的host和ip寫入節(jié)點(diǎn)留美,nimbus的client啟動(dòng)時(shí)會(huì)對(duì)該節(jié)點(diǎn)進(jìn)行檢查,之后會(huì)監(jiān)聽這個(gè)節(jié)點(diǎn)伸刃,發(fā)送變化時(shí)會(huì)flushClient谎砾。
  2. 創(chuàng)建客戶端:主要的邏輯在flushClient中,會(huì)從節(jié)點(diǎn)中讀取host和port捧颅,然后使用thrift的接口創(chuàng)建連接景图。
  3. 創(chuàng)建完成后,NimbusClient通過flush方法獲取了與服務(wù)端通信的Nimbus.Client碉哑。

2.4 NimbusServer

由于客戶端所做的事情只是調(diào)用挚币,具體的邏輯是在服務(wù)端實(shí)現(xiàn)的,因此有必要對(duì)服務(wù)端進(jìn)行分析扣典。我們?cè)谑褂?code>jstorm nimbus命令時(shí)妆毕,最終會(huì)調(diào)用com.alibaba.jstorm.daemon.nimbus.NimbusServer,里面有main方法贮尖,可以啟動(dòng)一個(gè)服務(wù)器進(jìn)程笛粘。

public static void main(String[] args) throws Exception {
    // read configuration files
    @SuppressWarnings("rawtypes")
    Map config = Utils.readStormConfig();
    NimbusServer instance = new NimbusServer();
    INimbus iNimbus = new DefaultInimbus();
    instance.launchServer(config, iNimbus);
}

服務(wù)器的相關(guān)設(shè)計(jì)我們?cè)谙乱黄臋n中介紹,現(xiàn)在主要是對(duì)服務(wù)器處理提交拓?fù)淙蝿?wù)的處理進(jìn)行分析。按照thrift的定義薪前,我們?cè)?code>initThrift中找到了處理遠(yuǎn)程調(diào)用的類是ServiceHandler润努。
ServiceHandler的submitTopologyWithOpts方法中,提交topology示括。

ServiceHandler的主要邏輯包括:生成標(biāo)準(zhǔn)化的Topology實(shí)例并注冊(cè)到zk上铺浇;將Topology需要的jar包、Topology實(shí)例和配置通過復(fù)制和序列化保存到本地路徑上垛膝;創(chuàng)建Task并注冊(cè)到zk上随抠,task節(jié)點(diǎn)的內(nèi)容為TaskInfo即任務(wù)對(duì)應(yīng)的組件名稱;包裝一個(gè)TopologyEvent對(duì)象提交給TopologyAssign線程處理繁涂,由其完成任務(wù)的資源分配。

  • checkTopologyActive 檢查topologyName是否存在二驰,NimbusData中包含一個(gè)用于處理zookeeper的類StormZkClusterState扔罪,首先從/jstorm/topology里面獲取激活狀態(tài)的拓?fù)淞斜恚闅v這個(gè)列表桶雀,如果節(jié)點(diǎn)名稱包含topologyName矿酵,則獲取節(jié)點(diǎn)內(nèi)部數(shù)據(jù),將其反序列化為StormBase類矗积,如果name一致全肮,說明已經(jīng)存在。
  • NimbusData中保存了提交的總數(shù)量棘捣,對(duì)其加1辜腺;之后,構(gòu)造topologyId乍恐,為topologyname-次序-時(shí)間戳评疗。將NimbusData中jstorm的conf與提交的stormConf合并,由于配置被修改了茵烈,對(duì)提交的StormTopology對(duì)象重新構(gòu)造為一個(gè)標(biāo)準(zhǔn)化后的StormTopology百匆。
  • setupStormCode對(duì)Nimbus所在服務(wù)器創(chuàng)建路徑${storm.local.dir}/nimbus/stormdist/{topologyId},將上傳的jar包復(fù)制過來呜投,文件名為stormjar.jar加匈;將標(biāo)準(zhǔn)化后的StormTopology序列化到{storm.local.dir}/nimbus/{topologyId}/stormcode.ser文件中,將標(biāo)準(zhǔn)化后的StormTopology序列化到{storm.local.dir}/nimbus/{topologyId}/stormconf.ser文件中仑荐。
  • setupZkTaskInfo針對(duì)bolt和spout組件生成TaskInfo雕拼,首先在zk上創(chuàng)建/jstorm/taskbeats/{topoologyId},讀取stormconf.ser文件释漆,反序列化為stormConf悲没;讀取stormcode.ser文件,反序列化為StormTopology實(shí)例;然后為topology添加Acker示姿,Acker實(shí)現(xiàn)了IBolt甜橱,用來跟蹤消息處理情況并通知spout;再為topology中的所有組件添加一個(gè)名為__system的stream栈戳。最后遍歷topology中的組件岂傲,根據(jù)設(shè)置的并發(fā)度,保存到taskToComponetId中子檀,例如镊掖,spout并發(fā)度為2,那么會(huì)保存1->spout 2->spout褂痰。有了這個(gè)TreeMap亩进,遍歷并創(chuàng)建TaskInfo,在zk上創(chuàng)建/jstorm/tasks/{topoologyId}/{taskId},將TaskInfo序列化保存到節(jié)點(diǎn)里面缩歪。例如归薛,spout會(huì)在創(chuàng)建兩個(gè)節(jié)點(diǎn),節(jié)點(diǎn)內(nèi)容為TaskInfo匪蝙。
  • 為topology分配任務(wù)主籍,TopologyAssign是一個(gè)線程,會(huì)不斷地從queue中獲取TopologyAssignEvent對(duì)象并進(jìn)行任務(wù)分配逛球,所以在提交提交任務(wù)的時(shí)候千元,只需要?jiǎng)?chuàng)建TopologyAssignEvent對(duì)象并push到queue中。而實(shí)際的任務(wù)分配則是在TopologyAssign中完成颤绕。在NimbusServer啟動(dòng)時(shí)的initTopologyAssign方法中幸海,會(huì)啟動(dòng)TopologyAssign線程。

2.4 TopologyAssign

TopologyAssign線程的主要工作就是處理Topology的任務(wù)分配屋厘,主要邏輯在doTopologyAssignment方法中涕烧。流程是先根據(jù)TopologyAssignEvent生成Assignment,然后將Assignment備份到zk中汗洒。在分析之前议纯,需要先對(duì)下面幾個(gè)對(duì)象有所了解

TopologyAssignContext

在處理分配拓?fù)渲埃覀円延械馁Y源是Topology實(shí)例和配置信息溢谤。TopologyAssignContext的目的是將分配拓?fù)渌璧臄?shù)據(jù)都維護(hù)起來瞻凤。TopologyAssignContext內(nèi)部保存了拓?fù)浞峙涞南嚓P(guān)上下文:

  • assignType - 分配的類型,包括ASSIGN_TYPE_NEW-新建世杀,ASSIGN_TYPE_REBALANCE-重新平衡阀参,ASSIGN_TYPE_MONITOR-監(jiān)聽。
  • rawTopology - 未被修改前的StormTopology實(shí)例瞻坝;
  • stormConf - 拓?fù)涞呐渲茫?/li>
  • oldAssignment - 拓?fù)湟呀?jīng)存在的Assignment實(shí)例蛛壳;
  • SupervisorInfo - 當(dāng)前supervisor運(yùn)行信息;
  • taskToComponent - task與組件名稱的映射;
  • allTaskIds - 所有的task衙荐;
  • deadTaskIds - 已經(jīng)不運(yùn)行的task捞挥;
  • unstoppedTaskIds - 雖然活著但所在supervisor停止運(yùn)行的task
  • cluster - storm集群中的SupervisorInfo

DefaultTopologyAssignContext

DefaultTopologyAssignContext繼承了TopologyAssignContext類,

  • sysTopology - Storm會(huì)對(duì)rawTopology添加ack等機(jī)制忧吟,生成新的Topology實(shí)例
  • sidToHostname - supervisorid -> hostname的映射
  • hostToSid - hostname -> supervisorid的映射
  • oldWorkerTasks - zk中記錄了Topology舊的Assignment砌函,內(nèi)部包含task的列表
  • componentTasks - 組件名稱 -> taskid的映射
  • unstoppedAssignments - 已經(jīng)停止的supervisor中的task
  • totalWorkerNum - worker總數(shù),默認(rèn)大小為min({topologt.workers}溜族,所有組件的并發(fā)數(shù)之和)
  • unstoppedWorkerNum - 已經(jīng)停止的supervisor中的task數(shù)量
  • DISK_WEIGHT - 磁盤的權(quán)重
  • CPU_WEIGHT - CPU的權(quán)重
  • MEM_WEIGHT - 內(nèi)存的權(quán)重
  • PORT_WEIGHT - 端口的權(quán)重
  • TASK_ON_DIFFERENT_NODE_WEIGHT - task運(yùn)行在不同節(jié)點(diǎn)上的權(quán)重
  • USE_OLD_ASSIGN_RATIO_WEIGHT - 使用舊的分配率的權(quán)重
  • USER_DEFINE_ASSIGN_RATIO_WEIGHT - 使用用戶定義的分配率的權(quán)重
  • DEFAULT_WEIGHT - 默認(rèn)的權(quán)重

DefaultTopologyScheduler
DefaultTopologyScheduler實(shí)現(xiàn)了IToplogyScheduler接口讹俊,IToplogyScheduler只有一個(gè)方法

Map<Integer, ResourceAssignment> assignTasks(TopologyAssignContext contex) 
            throws FailedAssignTopologyException;

也就是說DefaultTopologyScheduler的主要功能就是根據(jù)TopologyAssignContext為task分配資源,如指定所在的supervisor煌抒,并在Supervisor的資源池中為task分配所需的資源仍劈。assignTasks方法是處理拓?fù)涞暮诵墓δ堋?/p>

ResourceAssignment
每個(gè)task會(huì)被分配給一個(gè)ResourceAssignment實(shí)例,ResourceAssignment包含了task所在supervisorId和磁盤寡壮,cpu耳奕,內(nèi)存這些資源所在的slot,以及端口port诬像。

  • supervisorId - supervisor的id
  • hostname - 所在的機(jī)器
  • diskSlot - 磁盤的slot
  • cpuSlotNum - cpu的slot數(shù)量
  • memSlotNum - 內(nèi)存的slot數(shù)量
  • port - 端口

Assignment

每個(gè)Topology都需要被nimbus分配給supervisor執(zhí)行,分配的元數(shù)據(jù)就保存在Assignment對(duì)象中闸婴。通過zk的節(jié)點(diǎn)/jstorm/assignments/{topologyid}傳遞給Supervisor坏挠,Assignment內(nèi)部包括:

  • nodeHost - 保存{supervisorid: hostname} -- 分配的supervisor
  • taskStartTimeSecs - 每個(gè)task的啟動(dòng)時(shí)間,{taskid, taskStartSeconds
  • masterCodeDir - topology的代碼路徑
  • taskToResource - 每個(gè)task對(duì)應(yīng)的ResourceAssignment

處理流程

mkAssignment的prepareTopologyAssign

prepareTopologyAssign的主要作用是收集jstorm集群當(dāng)前的上下文數(shù)據(jù)邪乍,包括:拓?fù)涿Q降狠、標(biāo)準(zhǔn)化后的Topology、Topology配置庇楞、Supervisor列表榜配、task列表、已經(jīng)存在的Assignment吕晌。

  • 從本地路徑讀取topology文件蛋褥,然后反序列化為StormTopology,保存到rawTopology中睛驳;再組裝topology的stormConf烙心;
  • 然后,從zk上獲取所有Supervisor乏沸,如果沒有Supervisor存在淫茵,會(huì)拋出異常。
  • 從zk上獲取task列表蹬跃,保存到TaskToComponent和AllTaskIds中匙瘪;有了拓?fù)涞膖ask列表之后,從zk中獲取拓?fù)涞腁ssignment信息。如果已經(jīng)存在了丹喻,則會(huì)根據(jù)task的心跳和Supervisor的運(yùn)行狀態(tài)薄货,區(qū)分出還在運(yùn)行中的和已經(jīng)停止運(yùn)行的task,分別保存到aliveTasks和unstoppedTasks中驻啤。具體邏輯是:從/jstorm/taskbeats/{topoologyId}節(jié)點(diǎn)下查找拓?fù)渲邪膖ask記錄到allTaskIds中菲驴;通過保持心跳的task記錄,可以將aliveTasks從allTaskIds過濾出來骑冗;由于Supervisor可能已經(jīng)停止運(yùn)行赊瞬,所以還需要找出來其包含的task,具體是通過zk中舊的Assignment中維護(hù)了task和其對(duì)應(yīng)的Supervisor(getTaskToResource的ResourceAssignment中)贼涩,如果Supervisor已經(jīng)不在運(yùn)行了巧涧,就將這些task保存到unstoppedTasks中。有了這些信息遥倦,deadTasks就是allTaskId減去aliveTasks的集合谤绳。
  • 如果zk上沒有拓?fù)鋵?duì)應(yīng)的舊Assignment信息,就不需要找出deadTasks和unstoppedTasks袒哥,這兩個(gè)集合都為空了缩筛。
  • 為Supervisor收集未使用的slot,zk上記錄了所有的Supervisor和Assignment堡称,而Assignment中記錄了使用的資源瞎抛,這樣就可以分配已使用的資源,剩余的就是未使用的資源却紧。首先遍歷zk上注冊(cè)的所有的Assignment桐臊,由于Assignment中包含了task與ResourceAssignment的映射,而ResourceAssignment中包含了task所在的Supervisor的id晓殊,因此可以獲取到每個(gè)task的SupervisorInfo断凶,SupervisorInfo中維護(hù)了Supervisor的資源池(cpu,內(nèi)存巫俺,磁盤和網(wǎng)絡(luò))认烁,在池中分配task所需的資源。
  • 設(shè)置拓?fù)湟延械腁ssignment介汹,保存到OldAssignment中砚著,如果拓?fù)涞腁ssignment不存在,會(huì)從zk中獲取AssignmentBak痴昧。在這個(gè)過程中稽穆,也會(huì)設(shè)置AssignType的類型。

mkAssignment的IToplogyScheduler

收集完TopologyContext之后赶撰,就可以使用調(diào)度器來為任務(wù)分配資源了舌镶。IToplogyScheduler是默認(rèn)的用來調(diào)度拓?fù)涞念愔梗趇nit方法中被創(chuàng)建,使用的是DefaultTopologyScheduler類餐胀。這個(gè)類的作用是為task分配指定的資源哟楷。

分配過程為:

  1. 檢查分配類型是否為ASSIGN_TYPE_NEW、ASSIGN_TYPE_REBALANCE否灾、ASSIGN_TYPE_MONITOR三者之一卖擅;
  2. 在TopologyAssignContext的基礎(chǔ)上創(chuàng)建DefaultTopologyAssignContext,內(nèi)部包含了分配task資源需要的weight配置
  3. 如果是REBALANCE類型的墨技,需要先釋放掉已有的資源惩阶。context中保存了所有的SupervisorInfo,zk中記錄了原有的Assignment扣汪,內(nèi)部包含了task使用資源的情況断楷,這樣就可以對(duì)SupervisorInfo內(nèi)的資源池進(jìn)行釋放。
  4. 統(tǒng)計(jì)需要分配的task崭别,保存在needAssignTasks中冬筒。ASSIGN_TYPE_NEW類型的時(shí)候所有都需要分配;ASSIGN_TYPE_REBALANCE 需要分配(所有task 減去 已經(jīng)停止的supervisor的task)茅主;ASSIGN_TYPE_MONITOR需要分配所有dead的task舞痰。
  5. 由于當(dāng)前zk中可能有Assignment,其對(duì)應(yīng)的task需要保持不變诀姚,所以匀奏,將這些task查找出來保存在keepAssigns中,然后獲取需要保持的任務(wù)對(duì)應(yīng)的<supervisorid和port> - > task学搜,即每個(gè)WorkerSlot上對(duì)應(yīng)的task。
  6. 計(jì)算 需要分配的worker數(shù)量 = 所有的worker總數(shù)量 - 已停止的supervisor的worker數(shù)量 - 需要保持的worker數(shù)量
  7. registerPreAssignHandler方法計(jì)算需要分配的任務(wù)needAssignTasks中每個(gè)組件分配類型ComponentAssignType對(duì)應(yīng)的task论衍。由于task中記錄了組件名稱瑞佩,每個(gè)組件的配置可能會(huì)不同。ComponentAssignType共有三種類型的分配方式:USER_DEFINE, USE_OLD, NORMAL坯台。每種方式都可能會(huì)對(duì)應(yīng)著我們需要分配的若干task炬丸。
  8. 有了ComponentAssignType之后,就可以為task分配資源了蜒蕾。使用USER_DEFINE時(shí)稠炬,使用UserDefinePreAssign需要從配置文件中讀取需要分配的cpu,內(nèi)存和磁盤的solt咪啡。使用NORMAL時(shí)首启,對(duì)應(yīng)的NormalPreAssign類。先查找是否有用戶自定義的分配方式撤摸,如果沒有毅桃,從配置中獲取task占用的slot數(shù)量褒纲。cpu.slots.per.task:每個(gè)task會(huì)使用的cup slot數(shù)量,memory.slots.per.task每個(gè)task會(huì)使用的內(nèi)存slot數(shù)量钥飞,task.alloc.disk.slot每個(gè)task會(huì)使用的磁盤slot數(shù)量莺掠。有了slot數(shù)量,還需要注意task.on.differ.node是否要求task位于不同的node上读宙。滿足slot的Supervisor很多彻秆,這時(shí)候就會(huì)使用總權(quán)重或level來為task選擇最佳的Supervisor摸屠,并在Supervisor的資源池中分配相應(yīng)的資源脂信,并封裝為ResourceAssignment對(duì)象返回。
  9. 分配完成后蒂窒,執(zhí)行PostAssignTaskPort的postAssign方法膀估,為task分配worker需要的port幔亥。

完成上述兩個(gè)步驟之后,已經(jīng)為Topology分配了所需的資源察纯,包括Supervisor節(jié)點(diǎn)帕棉,worker,內(nèi)存饼记,磁盤香伴,cpu的數(shù)量。

mkAssignment的后續(xù)操作

  1. updateGroupResource 如果使用了Group模式具则,就需要設(shè)置分組數(shù)據(jù)即纲。
  2. 計(jì)算supervisor與host的映射:zk中記錄了所有的supervisor,過濾出task需要的supervisorid與host的映射關(guān)系即可。
  3. 計(jì)算task的starttime:ASSIGN_TYPE_NEW類型為當(dāng)前時(shí)間
  4. 按照Assignment的構(gòu)造方法博肋,封裝為Assignment對(duì)象
  5. 將Assignment的數(shù)據(jù)寫入zk的/jstorm/assignments/{topologyId}節(jié)點(diǎn)里面
  6. 更新task的心跳起始時(shí)間

setTopologyStatus

激活Topology:/jstorm/topology/{topologyId}讀取并反序列化為StormBase對(duì)象的實(shí)例低斋。如果不存在,就創(chuàng)建這個(gè)節(jié)點(diǎn)匪凡,并創(chuàng)建StormBase實(shí)例保存起來膊畴。如果StormBase存在,就更新狀態(tài)病游。

backupAssignment

主要的操作是將Assignment保存在zk上面唇跨。由于上面的過程中已經(jīng)生產(chǎn)了Assignment對(duì)象,遍歷task并構(gòu)造出組件與task的映射關(guān)系后衬衬,封裝為AssignmentBak买猖,保存到/jstorm/assignments_bak/{topologyName}節(jié)點(diǎn)的內(nèi)容中。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末滋尉,一起剝皮案震驚了整個(gè)濱河市玉控,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌狮惜,老刑警劉巖奸远,帶你破解...
    沈念sama閱讀 217,084評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件既棺,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡懒叛,警方通過查閱死者的電腦和手機(jī)丸冕,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,623評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來薛窥,“玉大人胖烛,你說我怎么就攤上這事∽缑裕” “怎么了佩番?”我有些...
    開封第一講書人閱讀 163,450評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)罢杉。 經(jīng)常有香客問我趟畏,道長(zhǎng),這世上最難降的妖魔是什么滩租? 我笑而不...
    開封第一講書人閱讀 58,322評(píng)論 1 293
  • 正文 為了忘掉前任赋秀,我火速辦了婚禮,結(jié)果婚禮上律想,老公的妹妹穿的比我還像新娘猎莲。我一直安慰自己,他們只是感情好技即,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,370評(píng)論 6 390
  • 文/花漫 我一把揭開白布著洼。 她就那樣靜靜地躺著,像睡著了一般而叼。 火紅的嫁衣襯著肌膚如雪身笤。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,274評(píng)論 1 300
  • 那天葵陵,我揣著相機(jī)與錄音液荸,去河邊找鬼。 笑死埃难,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的涤久。 我是一名探鬼主播涡尘,決...
    沈念sama閱讀 40,126評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼响迂!你這毒婦竟也來了考抄?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,980評(píng)論 0 275
  • 序言:老撾萬榮一對(duì)情侶失蹤蔗彤,失蹤者是張志新(化名)和其女友劉穎川梅,沒想到半個(gè)月后疯兼,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,414評(píng)論 1 313
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡贫途,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,599評(píng)論 3 334
  • 正文 我和宋清朗相戀三年吧彪,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片丢早。...
    茶點(diǎn)故事閱讀 39,773評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡姨裸,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出怨酝,到底是詐尸還是另有隱情傀缩,我是刑警寧澤,帶...
    沈念sama閱讀 35,470評(píng)論 5 344
  • 正文 年R本政府宣布农猬,位于F島的核電站赡艰,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏斤葱。R本人自食惡果不足惜慷垮,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,080評(píng)論 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望苦掘。 院中可真熱鬧换帜,春花似錦、人聲如沸鹤啡。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,713評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽递瑰。三九已至祟牲,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間抖部,已是汗流浹背说贝。 一陣腳步聲響...
    開封第一講書人閱讀 32,852評(píng)論 1 269
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留慎颗,地道東北人乡恕。 一個(gè)月前我還...
    沈念sama閱讀 47,865評(píng)論 2 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像俯萎,于是被迫代替她去往敵國(guó)和親傲宜。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,689評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容

  • 這是一個(gè)JStorm使用教程夫啊,不包含環(huán)境搭建教程函卒,直接在公司現(xiàn)有集群上跑任務(wù),關(guān)于JStorm集群環(huán)境搭建撇眯,后續(xù)研...
    Coselding閱讀 6,330評(píng)論 1 9
  • Date: Nov 17-24, 2017 1. 目的 積累Storm為主的流式大數(shù)據(jù)處理平臺(tái)對(duì)實(shí)時(shí)數(shù)據(jù)處理的相關(guān)...
    一只很努力爬樹的貓閱讀 2,173評(píng)論 0 4
  • 高階練習(xí)冊(cè)第一章成為和接受問題 我在拒絕成為什么报嵌?如果我成為了它虱咧,那將在我的生命中創(chuàng)造很多很多有的金錢? 擁有一個(gè)...
    羅蓓閱讀 926評(píng)論 1 2
  • 蘭佳閱讀 253評(píng)論 0 0
  • 五一小長(zhǎng)假,帶著小雨回娘家锚国。她已經(jīng)很獨(dú)立了腕巡,主動(dòng)找外公外婆玩,基本不來粘我跷叉∫荼ⅲ看著這個(gè)小人跟周圍的家...
    betty海丹閱讀 774評(píng)論 0 1