以ReachTopology為例子较屿,這個(gè)類(lèi)在storm-starter里动看。
LinearDRPC顧名思義是線性的懒闷,bolt的處理流程是linear的宪赶。
重要的方法
1.StormTopologycreateTopology(DRPCSpout spout)
linearDRPC核心的防范就是創(chuàng)建createTopology巩趁,方法里首先會(huì)給DRPCSpout賦予一個(gè)spoutid痒玩,然后創(chuàng)建一個(gè)bolt,這個(gè)bolt接受spout的stream议慰。
bolt的類(lèi)型是PrepareRequest蠢古,PrepareRequest會(huì)發(fā)射三個(gè)stream,分別是參數(shù)(args)别凹,返回信息(returnInfo)和請(qǐng)求id(requestID)
接下來(lái)草讶,會(huì)遍歷topo中的每一個(gè)組件(component),遍歷組件的過(guò)程主要做了幾個(gè)事情:
1. 對(duì)每一個(gè)組件用BoltDeclarer包裝一下
2. 遍歷每一個(gè)component的group(比如shuffle group, field group, direct etc)付給新創(chuàng)建的BoltDeclarer
在這之后炉菲,每個(gè)linearDrpcBuilder都會(huì)有一個(gè)JoinResult和ReturnResult的bolt堕战,其中JoinResult接受Prepare發(fā)出的request stream和最后一個(gè)bolt發(fā)出的第一個(gè)fields。
并將二者做join生成json颁督。ReturnResult獲得JoinResult的結(jié)果践啄,并且通過(guò)drpc的result的方法放到result的隊(duì)列。
從數(shù)據(jù)流轉(zhuǎn)的角度沉御,請(qǐng)求如下進(jìn)行:
1. 首先屿讽,客戶(hù)端通過(guò)execute方法,將請(qǐng)求提交給drpc server(server有clojure實(shí)現(xiàn),請(qǐng)見(jiàn)drpc.clj)
2. 從server的角度來(lái)說(shuō)伐谈,topo提交后烂完,就會(huì)自動(dòng)開(kāi)始計(jì)算(參見(jiàn)上面的linearDrpcBuilder中的實(shí)現(xiàn)方式)。
3. DrpcSpout從server中獲取request(詳見(jiàn)drpc.clj的fetchRequest方法)
4. 計(jì)算結(jié)果最終放到另一個(gè)隊(duì)列诵棵,并由execute返回
注意這里用信號(hào)量來(lái)控制時(shí)序抠蚣, execute用信號(hào)量的許可為0來(lái)做初始化,這樣會(huì)導(dǎo)致execute阻塞
而result方法會(huì)對(duì)相應(yīng)的信號(hào)量做釋放