轉(zhuǎn)述自:Lifecycle of a Storm Topology
本文介紹的storm topology生命周期是基于0.7.1版本的,之后版本可能已發(fā)生了一些變化
我們從執(zhí)行storm jar命令提交topology給nimbus開始炮障,到supervisor啟動或停止worker泊业,再到task執(zhí)行整個過程進行描述衙傀,這其中也包括nimbus是如何監(jiān)控topology的乔煞。
關(guān)于topology的兩點說明:
1. 實際運行中的topology與我們看到的是不同的。運行過程中會有stream和acker bolt加入進來以保證數(shù)據(jù)處理的可靠性吱晒,system-topology函數(shù)負責(zé)topology的創(chuàng)建
2.system-topology用在a. nimbus創(chuàng)建task時 b.worker route消息時
啟動topology
storm jar命令會設(shè)置storm.jar環(huán)境變量在StormSubmitter上傳jar時使用,?然后帶著命令行參數(shù)執(zhí)行指定的class联四。StormSubmitter.submitTopology按以下步驟執(zhí)行:
? ? * ?upload未上傳過的jar文件
? ? * 使用nimbus的thrift接口實現(xiàn)uploading jars
? ? * uploadChunk每次上傳15kb的數(shù)據(jù)
? ? * 上傳完畢時調(diào)用finishFileUpload
? ? * topology的配置用json格式序列化
nimbus接收topology提交的請求撑碴,并對每個topology的配置進行規(guī)范格式化,完成topology一些靜態(tài)屬性的設(shè)置:
? ? * jars和configs存放在本地文件系統(tǒng)中朝墩,具體為:{nimbus local dir}/stormdist/{topology id}
? ? * setup-storm-static 將task--->component的映射寫入zookeeper
? ? * setup-heartbeats在zk中創(chuàng)建一個目錄來存放task心跳
nimbus調(diào)用mk-assignment給各個節(jié)點機分配任務(wù)醉拓,使用到以下信息:
? ? * master-code-dir: ?supervisors用來下載jars/configs
? ? * task->node+port: 任務(wù)id到worker的映射關(guān)系,worker由(node,port)對來標(biāo)識
? ? * node->host: node id到hostname的映射關(guān)系收苏。workers用這個映射關(guān)系來與其他worker進行通信廉嚼,node id用來標(biāo)識supervisors,因為多個supervisors可以運行在同一臺機器上
? ? * task->start-time-secs: 任務(wù)啟動的時間戳倒戏,nimbus用來監(jiān)控topology怠噪,launch time out需要設(shè)置的比心跳超時時間大一些,因為啟動時有很多初始任務(wù)要做杜跷,由nimbus.task.launch.secs設(shè)定
任務(wù)分配完處于deactivated模式傍念,start-storm將相關(guān)數(shù)據(jù)寫到zk之后進入active模式spouts開始emit tuples
supervisor默默的做兩件事:
? ? * 調(diào)用synchronize-supervisor,zk任務(wù)分配變化時就會執(zhí)行葛闷,另外每10s也會定時執(zhí)行憋槐,執(zhí)行時下載新的topology jars,將node要執(zhí)行的任務(wù)寫到本地文件系統(tǒng),其實是一個映射關(guān)系 port->localAssignment, LocalAssignment包含一個topo id還有task ids
? ? * 調(diào)用sync-processes, ?讀取第一件事寫到本地文件的內(nèi)容并與運行的topology對比以決定啟停worker
mk-worker函數(shù)用來啟動worker
? ? * worker之間互連并啟動一個線程監(jiān)控變化淑趾,如果worker任務(wù)變更會與啟停worker重連
? ? * 監(jiān)控topology是否active并將這個狀態(tài)賦給storm-active-atom變量阳仔,task根據(jù)這個變量決定是否調(diào)用spouts的nextTuple
? ? * worker啟動線程來執(zhí)行具體的tasks
mk-task函數(shù)用來啟動task
? ? * task啟動一個routing函數(shù),接收stream輸出tuple返回task ids(用來發(fā)送tuple)
? ? * task執(zhí)行spout和bolt業(yè)務(wù)邏輯
Topology監(jiān)控
nimbus對topology的整個生命周期進行監(jiān)控
? ? * 定時線程執(zhí)行日常任務(wù)的檢查
? ? * nimbus按一個有限狀態(tài)機轉(zhuǎn)動扣泊,包含:active\inactive\killed\rebalancing五個狀態(tài)
? ? * nimbus.monitor.freq.secs設(shè)定檢測周期近范,調(diào)用reassign-topology觸發(fā)monitor事件完成
? ? * reassign-topology調(diào)用mk-assignments來執(zhí)行topology的更新,更新時會啟停workers
殺掉Topology
storm kill調(diào)用nimbus thrift接口完成這個任務(wù)延蟹,可以用-w 指定remove topology的timeout评矩,
也給workers時機來處理完正在執(zhí)行的指令。kill命令是fault-tolerant的阱飘,當(dāng)nimbus恢復(fù)時會remove killed狀態(tài)的topology斥杜,之后刪除zk中該topology的信息和心跳目錄\jars\configs,這個由單獨的線程do-cleanup 完成