在jstorm的架構(gòu)中娇未,Nimbus的作用是用來操作Topology墨缘、上傳/下載文件、獲取集群狀態(tài)和拓?fù)錉顟B(tài)等忘蟹。主要作用是接收client的請求飒房,并在計(jì)算之后,修改zk上的數(shù)據(jù)媚值。
1.啟動(dòng)
使用下面的命令就可以啟動(dòng)nimbus服務(wù)器端了
jstorm nimbus
jstorm是一個(gè)python腳本狠毯,會(huì)組裝java命令后啟動(dòng)NimbusServer
java -server
-Djstorm.home=/Users/shishengjie/software/jstorm-0.9.1
-Dstorm.options=
-Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib
-Xms1g -Xmx1g -XX:+UseConcMarkSweepGC -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70
-Dlogfile.name=nimbus.log
-Dlog4j.configuration=File:/Users/shishengjie/software/jstorm-0.9.1/conf/jstorm.log4j.properties
-cp /Users/shishengjie/software/jstorm-0.9.1/jstorm-client-extension-0.9.1.jar:/Users/shishengjie/software/jstorm-0.9.1/jstorm-client-0.9.1.jar:/Users/shishengjie/software/jstorm-0.9.1/jstorm-server-0.9.1.jar:/Users/shishengjie/software/jstorm-0.9.1/lib/kryo-2.17.jar:/
com.alibaba.jstorm.daemon.nimbus.NimbusServer
在使用idea調(diào)試的時(shí)候,我們需要把相關(guān)的參數(shù)設(shè)置進(jìn)去褥芒。
2. NimbusServer
2.1 啟動(dòng)過程
NimbusServer的啟動(dòng)過程主要是在launchServer
方法中嚼松,
- 讀取配置嫡良,首先從
defaults.yaml
中讀取默認(rèn)配置;然后從storm.conf.file
指定的文件(如果沒有献酗,從classpath查找storm.yaml)寝受;最后從命令行讀取storm.options
中的配置,并依次覆蓋罕偎。 - 檢查conf配置是否為集群模式很澄,
storm.cluster.mode
默認(rèn)是distributed - 初始化關(guān)閉hook,會(huì)在jvm關(guān)閉時(shí)執(zhí)行cleanup方法
- 創(chuàng)建NimbusData颜及,主要是初始化兩個(gè)用于上傳和下載的map甩苛,這個(gè)map帶有過期清除功能,過期后可以回調(diào)指定方法俏站。
- 使用Curator客戶端創(chuàng)建
leaderSelector
讯蒲,用于nimbus客戶端選擇leader,選舉成為leader后會(huì)調(diào)用listener中的方法肄扎。 - 如果未獲得leader角色墨林,會(huì)初始化follower的線程,該線程的主要工作是清理已經(jīng)分配完成的本地文件,從master下載文件犯祠。
2.2 leader
成為leader后旭等,nimbus服務(wù)器需要很多初始化操作:
- 獲取hostname和port(配置:nimbus.thrift.port默認(rèn)為7627),將master的host和port寫入到zk下面的/jstorm/master節(jié)點(diǎn)的內(nèi)容里雷则;
- cleanupCorruptTopologies清除中斷的拓?fù)淞疚恚趜k中有記錄但是本地dir上面沒有的拓?fù)浔徽J(rèn)為是被中斷的。在zk中找 /jstorm/topology/下面的節(jié)點(diǎn)月劈,本地路徑
/local-storm-dir/nimbus/stormdist
下面是拓?fù)淞斜恚瑢k中的列表移除本地有的,剩下都是本地沒有的藤乙,清除的操作就是從zk中移除節(jié)點(diǎn)猜揪。 - initGroup初始化group: 如果配置了分組模式(nimbus.groupfile.path中指定了文件路徑,如conf下的group.ini)那么會(huì)解析這個(gè)文件坛梁,并將group信息保存到NimbusData中而姐。
- initTopologyAssign初始化TopologyAssign,這是一個(gè)單例,TopologyAssign線程,用于為topology的task分配資源
- initTopologyStatus,初始化狀態(tài),更新 /jstorm/topology/的子節(jié)點(diǎn)的StormBase的狀態(tài)為StatusType.startup
- initMonitor初始化監(jiān)視器,nimbus.monitor.freq.secs 10s運(yùn)行一次,根據(jù)心跳時(shí)間查找dead的task,修改zk中topology的狀態(tài)為monitor
- initCleaner初始化清理器划咐,每隔一段時(shí)間(默認(rèn)600秒)拴念,清理/LOCAL-DIR/nimbus/inbox中上傳的jar包
- 創(chuàng)建ServiceHandler,其實(shí)現(xiàn)了thrift的nimbus服務(wù),處理client的請求
- initThrift初始化thrift服務(wù)器褐缠,用來處理client端的請求
3. 提供的服務(wù)
3.1 getClusterInfo
這個(gè)服務(wù)用于從zk中獲取集群當(dāng)前狀態(tài):
- 從zk獲取
/jstorm/topology/
下獲取子節(jié)點(diǎn)和里面的內(nèi)容StormBase政鼠,遍歷topology,獲取每個(gè)拓?fù)涞腁ssignment队魏,獲取group信息公般,封裝到TopologySummary - 獲取Supervisor的狀態(tài),從
/jstorm/supervisors
下的所有子節(jié)點(diǎn),內(nèi)容是SupervisorInfo,將supervisors和assignments封裝為SupervisorSummary - 封裝為ClusterSummary返回
3.2 上傳文件
beginFileUpload
開始上傳文件主要確定了文件的上傳后的路徑和文件名:{localdir}/nimbus/inbox/stormjar-{uuid}.jar
官帘,然后創(chuàng)建操作文件的Channel瞬雹,加入到Uploader中。
uploadhunk
傳入文件的全路徑后和寫入的byte數(shù)組刽虹,從Uploader獲取Channel酗捌,將數(shù)據(jù)寫入文件
finishFileUpload
結(jié)束上傳,從Uploader獲取Channel涌哲,關(guān)閉意敛。
3.3 下載文件
beginFileDownload
創(chuàng)建BufferFileInputStream,然后生成一個(gè)uuid作為id返回給client端膛虫。將id和BufferFileInputStream實(shí)例保存到Downloaders中草姻。
downloadChunk
client端傳入了id,從Downloaders中獲取BufferFileInputStream稍刀,然后讀取一個(gè)chunk放入byte數(shù)組并返回撩独。
3.4 killTopologyWithOpts
首先,確定topology是否處于激活狀態(tài)账月,再修改/jstorm/topology/{topologyId}的內(nèi)容是StormBase的狀態(tài)為StatusType.kill
3.5 activate與deactivate
修改/jstorm/topology/{topologyId}的內(nèi)容是StormBase的狀態(tài)為StatusType.activate和StatusType.inactivate
3.6 rebalance
首先综膀,確定topology是否處于激活狀態(tài),再修改/jstorm/topology/{topologyId}的內(nèi)容是StormBase的狀態(tài)為StatusType.rebalance
可以看到局齿,nimbus的主要作用是獲取配置剧劝,修改zk的配置,具體的實(shí)現(xiàn)還需要Supervisor進(jìn)行相應(yīng)的操作抓歼。下一節(jié)讥此,我們分析Supervisor的啟動(dòng)和工作方式。