Storm架構(gòu)
Storm是一個(gè)分布式料扰、可靠的實(shí)時(shí)計(jì)算系統(tǒng)。與Hadoop不同的是,它采用流式的消息處理方法悴能,對(duì)于每條消息輸入到系統(tǒng)中后就能被立即處理。適用于一些對(duì)實(shí)時(shí)性要求高的場(chǎng)景雳灾,比如廣告點(diǎn)擊在線統(tǒng)計(jì)漠酿、交易額實(shí)時(shí)統(tǒng)計(jì)等寻狂。
一些名詞解釋
Stream:Storm中被處理的數(shù)據(jù)流眼刃,一條消息稱為一個(gè)元組锁孟。
Spout:Storm連接外部數(shù)據(jù)源的組件芭届,可以認(rèn)為Storm的數(shù)據(jù)源谋旦。
Bolt:數(shù)據(jù)處理組件褐桌,Bolt里面封裝了處理數(shù)據(jù)的邏輯照雁。Spout和Bolt是Storm中的兩類組件秋泄,類似MapReduce中的Map和Reduce阱持。比如可以在Bolt上定義過濾夭拌、聚合、join衷咽、寫數(shù)據(jù)庫等鸽扁。
-
Stream Group:消息分組策略,定義了Bolt組件以何種方式接收數(shù)據(jù)镶骗。Storm內(nèi)置了八種消息分組策略桶现,我們也可以通過實(shí)現(xiàn)CustomStreamGrouping定義自己的消息分組策略。
- Shuffle grouping:隨機(jī)分配消息給Bolt task鼎姊,能夠保證每個(gè)Bolt task都能分配到相同數(shù)據(jù)量的元組骡和。
- Fileds grouping:根據(jù)字段進(jìn)行劃分相赁,比如按“user-id”字段進(jìn)行劃分,那么相同“user-id”的值會(huì)被分配到一個(gè)Bolt task中慰于。
- Partial grouping:類似Filed grouping噪生,但是能夠保證下游Bolt任務(wù)負(fù)載均衡。
- All grouping:將每條消息都廣播給所有Bolt任務(wù)东囚,也就是說每個(gè)Bolt處理的數(shù)據(jù)完全相同跺嗽。需要小心使用。
- Global grouping:所有消息流數(shù)據(jù)全部發(fā)送到一個(gè)Bolt任務(wù)中页藻。
- None grouping:不關(guān)心分組策略桨嫁,相當(dāng)于Shuffle grouping。
- Direct grouping:直接分組份帐,上游指定哪個(gè)Bolt任務(wù)接受數(shù)據(jù)璃吧。
- Local or shuffle grouping:資源本地化的一種實(shí)現(xiàn)方式,如果任務(wù)都在同一個(gè)進(jìn)程中废境,則會(huì)發(fā)送到該Bolt任務(wù)中畜挨。入果沒有,相當(dāng)于shuffle grouping噩凹。
Topology:由消息分組將Spout和Bolt連接起來的任務(wù)拓?fù)浒驮O喈?dāng)于MapReduce中Map和Reduce組成的任務(wù)。
Worker:工作進(jìn)程驮宴,運(yùn)行在Supervisor節(jié)點(diǎn)上逮刨,一個(gè)Woker進(jìn)程可以包含一個(gè)或多個(gè)Executor線程。每個(gè)Woker進(jìn)程上會(huì)執(zhí)行一組Task堵泽,比如Storm集群總共有50個(gè)Woker修己,如果Task總數(shù)為300,那么每個(gè)Worker上面需要執(zhí)行6個(gè)Task迎罗。Worker執(zhí)行topology的一個(gè)子集(不會(huì)出現(xiàn)一個(gè)Worker進(jìn)程為多個(gè)Topology提供服務(wù))睬愤,一個(gè)Topology任務(wù)可能由多個(gè)Woker進(jìn)程負(fù)責(zé)執(zhí)行。
Executor:執(zhí)行Spout或Bolt任務(wù)的線程纹安,由Worker進(jìn)程創(chuàng)建尤辱。每個(gè)Executor線程只會(huì)執(zhí)行一個(gè)Topology中的一個(gè)Component的task實(shí)例(但不一定只執(zhí)行一個(gè)task,可能執(zhí)行多個(gè)task)钻蔑。
Task:Storm中最小的處理單元啥刻,一個(gè)Executor中可以運(yùn)行一個(gè)或多個(gè)Task。Topology中的Spout和Bolt可以設(shè)置并行度咪笑,一個(gè)并行度對(duì)應(yīng)一個(gè)Task可帽。
一個(gè)Topology任務(wù)啟動(dòng)后,組件(Spout或Bolt)的task數(shù)量就已經(jīng)確定了(就是組件的并行度)窗怒。但是我們可以為該組件添加執(zhí)行線程映跟,也就是Executor(因?yàn)橛锌赡芤粋€(gè)Executor執(zhí)行了多個(gè)task蓄拣,為了提高執(zhí)行效率,可以增加Executor線程努隙。但是需要注意球恤,一個(gè)Executor只會(huì)為同一個(gè)Component的task服務(wù))。默認(rèn)情況下荸镊,task數(shù)是等于Executor數(shù)的咽斧,即一個(gè)Executor執(zhí)行一個(gè)task。
Storm架構(gòu)
Storm集群有兩類節(jié)點(diǎn):運(yùn)行Nimbus守護(hù)進(jìn)程的主節(jié)點(diǎn)和運(yùn)行Supervisor守護(hù)進(jìn)程的工作節(jié)點(diǎn)躬存。Nimbus節(jié)點(diǎn)用于分配代碼张惹、分配計(jì)算任務(wù)(分配給哪些Supervisor上的哪些Woker)和監(jiān)控狀態(tài)(用于故障檢測(cè)、恢復(fù))岭洲。Supervisor節(jié)點(diǎn)負(fù)責(zé)監(jiān)聽工作(監(jiān)聽Nimbus分配的任務(wù))宛逗、啟動(dòng)并停止Woker進(jìn)程。
Worker是運(yùn)行在Supervisor上的進(jìn)程盾剩,Supervisor收到Nimbus分配的任務(wù)后雷激,負(fù)責(zé)啟動(dòng)Nimbus指定的Woker進(jìn)程。Woker進(jìn)程執(zhí)行Topology的子集告私,一個(gè)Topology任務(wù)可能由運(yùn)行在多臺(tái)機(jī)器上的Worker進(jìn)程組成屎暇。
Woker進(jìn)程啟動(dòng)一個(gè)或多個(gè)Executor線程,Executor線程中可以有一個(gè)或多個(gè)Task德挣。每個(gè)Executor都會(huì)啟動(dòng)一個(gè)消息循環(huán)線程恭垦,用于接受快毛、處理和發(fā)送消息格嗅。
Nimbus和Supervisor之間協(xié)調(diào)工作也是由Zookeeper來完成的。
Nimbus和Supervisor都能快速失敗恢復(fù)唠帝,而且它們都是無狀態(tài)的屯掖,狀態(tài)信息存儲(chǔ)在Zookeeper(元數(shù)據(jù))和本地中。當(dāng)Nimbus或Supervisor掛掉后襟衰,可以重新啟動(dòng)并讀取狀態(tài)信息到集群中來正常運(yùn)行贴铜,所以Storm系統(tǒng)具有很高的容錯(cuò)性。
在邏輯上將Storm中消息來源節(jié)點(diǎn)稱為Spout瀑晒,消息處理節(jié)點(diǎn)稱為Bolt绍坝,它們通過流分組組成Topology。
Storm中元數(shù)據(jù)
為了更好的理解Storm的設(shè)計(jì)苔悦,我們可以通過Zookeeper中存儲(chǔ)的元數(shù)據(jù)來理解Storm架構(gòu)中各個(gè)節(jié)點(diǎn)之間的關(guān)系轩褐。
Storm在Zookeeper存儲(chǔ)的消息都經(jīng)都是以/storm開始,所有數(shù)據(jù)都存儲(chǔ)在葉子節(jié)點(diǎn)上玖详。下面說一個(gè)每個(gè)節(jié)點(diǎn)數(shù)據(jù)存儲(chǔ)的具體含義:
- /storm/workerbeats/{topology-id}/node-port:拓?fù)淙蝿?wù)所在的Woker進(jìn)程信息把介,node和port是Woker所在的主機(jī)和端口勤讽。里面主要存儲(chǔ)了Woker的運(yùn)行狀態(tài)和統(tǒng)計(jì)信息。Woker進(jìn)程會(huì)定時(shí)上報(bào)心跳到該節(jié)點(diǎn)拗踢,Nimbus通過心跳信息來確認(rèn)Woker進(jìn)程的存活脚牍,對(duì)于死掉的Woker,Nimbus會(huì)重新調(diào)度巢墅。統(tǒng)計(jì)信息包括該Woker上所有Executor的統(tǒng)計(jì)信息诸狭,比如發(fā)送消息數(shù),接受消息數(shù)等君纫,這些信息會(huì)顯示在UI中作谚。一個(gè)Topology任務(wù)可能劃分到多個(gè)Woker(node-port)上。
- /storm/storms/{topolog-id}:存儲(chǔ)了拓?fù)淙蝿?wù)的本身信息庵芭,比如名字妹懒、啟動(dòng)時(shí)間、運(yùn)行狀態(tài)双吆、使用Woker數(shù)眨唬、每個(gè)組件的并行度等。這個(gè)信息在任務(wù)注冊(cè)后好乐,就不會(huì)在發(fā)生改變匾竿。該節(jié)點(diǎn)信息,可以幫助Nimbus進(jìn)行資源分配蔚万,因?yàn)槟軌蛑滥男㏒upervisor上面有哪些任務(wù)岭妖。
- /storm/assignments/{topology-id}:Nimbus為每個(gè)Topology任務(wù)的分配信息,比如Topology在該Nimbus中的存儲(chǔ)目錄反璃、被分配給哪些Supervisor昵慌、Woker、Executor上淮蜈。
- /storm/supervisors/{supervisor-id>}:Supervisor節(jié)點(diǎn)的注冊(cè)信息斋攀,存儲(chǔ)了該節(jié)點(diǎn)自身一些統(tǒng)計(jì)信息,比如使用了哪些端口等梧田。該Znode是臨時(shí)節(jié)點(diǎn)淳蔼,當(dāng)Supervisor下線后,這些信息會(huì)自動(dòng)刪除裁眯,Nimbus感知到該節(jié)點(diǎn)下線后會(huì)重新為該Supervisor下的任務(wù)分配節(jié)點(diǎn)鹉梨。
- /storm/errors/{topology-id}/{component-id}/{equential-id}:存儲(chǔ)運(yùn)行中每個(gè)組件的錯(cuò)誤信息,每個(gè)組件只會(huì)保留最近十條錯(cuò)誤信息穿稳。
下面一張圖講述了節(jié)點(diǎn)直接如何創(chuàng)建存皂、使用這些元數(shù)據(jù):
Nimbus、Supervisor司草、Woker兩兩之間都需要維持心跳艰垂。Nimbus通過/storm/superviors/節(jié)點(diǎn)能夠知道哪些Supervisor存活泡仗。Nimbus通過/storm/workerbeats/節(jié)點(diǎn)能夠知道哪些Woker存活。Supervisor和Woker通過本地文件維持心跳猜憎,他們雖然是兩個(gè)進(jìn)程無法直接通信娩怎,為什么通過文件維持心跳呢,應(yīng)該有很多網(wǎng)絡(luò)通信框架可以吧胰柑。
Storm源碼
Storm基于Clojure和Java編寫的截亦。其中Clojure實(shí)現(xiàn)了Nimbus、Supervisor柬讨、Woker崩瓤、Executor以及Task這些基礎(chǔ)服務(wù)。Java實(shí)現(xiàn)了Storm的流處理(組件實(shí)現(xiàn))以及事務(wù)Topology踩官。
Storm中還有一些Trident代碼却桶,Trident是Storm實(shí)時(shí)消息處理的更高層抽象。
Storm集群搭建
環(huán)境準(zhǔn)備
1蔗牡、Java安裝
Storm1.x在Java7和Java8中完成了測(cè)試颖系,所以Java版本最好是1.7+。
2辩越、Python安裝
Storm1.x在Python2.6.6中完成了測(cè)試嘁扼,理論上3.x也能夠運(yùn)行,但是官方并沒有測(cè)試黔攒。
3趁啸、Zookeeper安裝
Storm使用Zookeeper進(jìn)行協(xié)調(diào)服務(wù),所以需要準(zhǔn)備Zookeeper環(huán)境督惰,版本官方上面沒有指定版本不傅。
Storm安裝
下載Storm jar包
http://storm.apache.org/downloads.html
Storm文件配置
Storm配置文件在${STORM_HOME}/conf/storm.yaml中,下面是一些必須的配置項(xiàng):
#zookeeper集群
storm.zookeeper.servers:
- "192.168.0.1"
- "192.168.0.2"
- "192.168.0.3"
#zookeeper端口
storm.zookeeper.port: 2181
#可作為nimbus的候選主機(jī)
nimbus.seeds: ["192.168.0.1"]
#storm數(shù)據(jù)存儲(chǔ)目錄姑丑,用于存儲(chǔ)少量狀態(tài)信息蛤签,比如jar、conf等
storm.local.dir: "/opt/yjz/storm/data"
#suppervisor可以作為woker進(jìn)程啟動(dòng)的端口栅哀,表明該Supervisor最多可以啟動(dòng)四個(gè)Worker進(jìn)程
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
更多配置可以查看https://github.com/apache/storm/blob/v1.2.2/conf/defaults.yaml。隨著深入了解称龙,可以優(yōu)化集群配置留拾。
節(jié)點(diǎn)啟動(dòng)
nohub bin/storm nimbus &
nohub bin/storm ui &
nonub bin/storm supervisor &
查看nimbus節(jié)點(diǎn),發(fā)現(xiàn)nimbus進(jìn)程已經(jīng)啟動(dòng)鲫尊,core為ui進(jìn)程痴柔。
jps
8134 nimbus
8713 core
10123 Jps
19710 QuorumPeerMain
查看supervisor節(jié)點(diǎn),發(fā)現(xiàn)supervisor進(jìn)程已經(jīng)啟動(dòng)疫向。
jps
19237 QuorumPeerMain
7610 Supervisor
8975 Jps
查看UI咳蔚,192.168.0.1:8080豪嚎。
Storm編程
編程模型
我們上面說過Storm提供了兩類組件:Spout和Bolt。所以我們使用Storm編程的對(duì)象主要也就是針對(duì)Spout和Bolt谈火。
Spout用于定義接受外部數(shù)據(jù)源數(shù)據(jù)侈询,并將其轉(zhuǎn)換成Storm內(nèi)部數(shù)據(jù),然后將這些數(shù)據(jù)發(fā)送給Bolt糯耍。
Bolt組件定義了數(shù)據(jù)處理邏輯扔字,也就是我們的業(yè)務(wù)處理邏輯,比如過濾温技、聚合革为、Join等。Bolt組件也可以用于寫入外部介質(zhì)舵鳞,比如寫入Mysql震檩、Redis等。
組件之間傳輸?shù)臄?shù)據(jù)在Storm中稱為元組(Tuple蜓堕,可以理解為一行數(shù)據(jù))恳蹲,Tuple是Storm數(shù)據(jù)傳輸?shù)幕締卧uple中定義了字段(Field俩滥,可以理解一行數(shù)據(jù)中一個(gè)字段)嘉蕾,這些Field是有Schema的,F(xiàn)iled可以是byte霜旧、short错忱、integer、long挂据、float以清、double、boolean崎逃、string和byte array掷倔,除了這些基礎(chǔ)類型,我們還可以自定義數(shù)據(jù)類型(需要自己實(shí)現(xiàn)針對(duì)自定義類型的序列化)个绍。
Storm 組件
在編寫Storm作業(yè)時(shí)我們會(huì)針對(duì)Spout和Bolt進(jìn)行編程實(shí)現(xiàn)勒葱,下面我們分別看一下Storm為我們提供的組件接口。
Spout組件
Spout是Storm中Topology的生產(chǎn)者巴柿,負(fù)責(zé)讀取外部數(shù)據(jù)凛虽,并轉(zhuǎn)換成Tuple。Spout可以設(shè)置處理的消息類型為可靠或不可靠广恢。對(duì)可靠的消息凯旋,Spout會(huì)緩存發(fā)出去的消息,當(dāng)該消息在topology中處理失敗時(shí),Spout可以重新發(fā)送該消息至非。對(duì)于不可靠的消息钠署,Spout一旦發(fā)送出該消息后就會(huì)將該消息扔掉,所以如果該消息處理失敗荒椭,那么該消息就會(huì)丟失谐鼎。
Spout可以發(fā)射多條消息流Stream,通過使用OutputFieldsDeclarer.declareStream()
方法來定義多個(gè)Stream戳杀,然后使用SpoutOutputCollector.emit()
方法在發(fā)射消息時(shí)指定Stream该面。
Spout中最重要的方法nextTuple()
,將外部數(shù)據(jù)源數(shù)據(jù)以tuple形式發(fā)送到Topology中的Bolt組件進(jìn)行處理信卡。需要注意nextTuple()不能阻塞隔缀,因?yàn)镾torm是在一個(gè)線程內(nèi)調(diào)用Spout的所有方法。
Spout對(duì)于可靠類型消息傍菇,還有兩個(gè)比較重要的方法就是ack和fail猾瘸。Storm在檢測(cè)一個(gè)tuple被整個(gè)Topology執(zhí)行成功的時(shí)候會(huì)回調(diào)ack,否則調(diào)用fail丢习。
下面是Spout組件在Storm中的實(shí)現(xiàn)牵触,我們分別來看下這些組件都是什么。
IComponent接口
IComponent接口是所有組件的頂級(jí)接口咐低,IComponent中定義了所有組件可能需要用到的方法(其實(shí)就是定義了Spout和Bolt組件都用得到的方法)揽思。
- declareOutputFields方法用于聲明該拓?fù)渌辛?Stream)的輸出模式(Schema),比如聲明輸出流ID见擦、輸出字段(Field)以及輸出流是否為直接流钉汗。
- getComponentConfiguration方法用于聲明該組件的配置,但是它只能覆蓋以"topology.*"配置為開頭的子集鲤屡。我們也可以通過TopologyBuilder構(gòu)建拓?fù)鋾r(shí)损痰,對(duì)組件進(jìn)行進(jìn)一步配置覆蓋。
ISpout接口
ISpout是Spout組件的定義的頂級(jí)接口酒来,它定義了Spout組件支持的方法卢未。Storm會(huì)在相應(yīng)的階段調(diào)用ISpout接口中特定的方法,比如啟動(dòng)拓?fù)鋾r(shí)會(huì)首先調(diào)用open()方法堰汉,正常停止拓?fù)鋾r(shí)會(huì)調(diào)用close()方法(kill -9不會(huì)被調(diào)用)辽社。
- open方法會(huì)在任務(wù)在集群工作進(jìn)程初始化時(shí)調(diào)用,用于提供Spout執(zhí)行所需要的環(huán)境衡奥。比如讀取外部數(shù)據(jù)源的一些初始化配置可以寫在這里爹袁。Map類型的conf參數(shù)是這個(gè)Spout的配置,它包含了拓?fù)渑c集群配置的合并集矮固。TopologyContext類型的context是該拓?fù)涞纳舷挛模ㄍ負(fù)鋓d、組件id档址、輸入輸出信息等盹兢。SpoutOutputCollector類型的collector參數(shù)是Spout的收集器,用于發(fā)射tuple守伸。
- close方法當(dāng)一個(gè)ISpout關(guān)閉時(shí)會(huì)被調(diào)用绎秒,但是并不能保證一定被調(diào)用,比如Supervisor被kill -9強(qiáng)制殺死的時(shí)候尼摹。
- active方法是Spout從失效模式到激活狀態(tài)時(shí)被調(diào)用见芹。Spout可以處于失效狀態(tài)或激活狀態(tài),處于失效狀態(tài)的Spout不會(huì)調(diào)用nextTuple方法蠢涝。從失效狀態(tài)到激活狀態(tài)調(diào)用nextTuple方法前玄呛,會(huì)調(diào)用active方法。
- deactive方法是當(dāng)Spout失效時(shí)被調(diào)用和二,失效狀態(tài)的Spout不會(huì)調(diào)用nextTuple方法徘铝。
- nextTuple方法是Spout組件最重要的方法,nextTuple用于讀取外部數(shù)據(jù)源數(shù)據(jù)轉(zhuǎn)換為tuple惯吕,并且通過SpoutOutputCollector收集器發(fā)射tuple惕它。由于nextTuple、ack废登、fail方法都是在一個(gè)線程內(nèi)被調(diào)用淹魄,所以nextTuple方法不應(yīng)該有阻塞代碼。
- ack方法是當(dāng)Storm確定從該Spout發(fā)射出去標(biāo)識(shí)符為msgId的消息被topology完整處理完成時(shí)堡距,會(huì)調(diào)用ack方法甲锡,我們可以在這里實(shí)現(xiàn)一些邏輯,比如從我們的數(shù)據(jù)源隊(duì)列中移除該消息吏颖。
- fail方法和ack方法相反搔体,當(dāng)msgId消息在topology中被處理失敗時(shí)會(huì)被調(diào)用。
Spout的可靠性消息類型不需要我們通過fail方法實(shí)現(xiàn)半醉,Storm會(huì)自動(dòng)實(shí)現(xiàn)疚俱。ack和fail方法只是給我們獲取消息被處理成功與否的接口。
IRichSpout接口
IRichSpout接口繼承了ISpout和IComponent接口缩多,它是我們實(shí)現(xiàn)Spout組件的主要接口呆奕。它本身沒有定義任何方法,所以我們實(shí)現(xiàn)Spout組件時(shí)候只需要實(shí)現(xiàn)ISpout和IComponent接口的方法衬吆。
BaseComponent抽象類
BaseComponent抽象類實(shí)現(xiàn)實(shí)現(xiàn)了IComponent組件的getComponentConfiguration方法梁钾,并且返回為空。
它主要的作用就是對(duì)于一些Spout組件并不需要進(jìn)行覆蓋配置逊抡,這時(shí)候通過繼承BaseComponent抽象類就不需要實(shí)現(xiàn)該方法了姆泻。
BaseXxx在Storm組件中零酪,一般都是指一些基礎(chǔ)實(shí)現(xiàn)。目的就是為了避免我們寫代碼時(shí)候去實(shí)現(xiàn)一些我們用不到的方法(我們一般都是置為空)拇勃。
BaseRichSpout抽象類
BaseRichSpout繼承了IRichSpout接口和BaseComponent接口四苇,它實(shí)現(xiàn)了一些我們?cè)趯?shí)際編寫Spout組件時(shí)可能用不到的方法。比如close方咆、active月腋、deactive、ack瓣赂、fail方法(方法體都為空)榆骚。這樣當(dāng)我們通過繼承BaseRichSpout抽象類來實(shí)現(xiàn)Spout組件時(shí)候,這些方法我們都可以不需要實(shí)現(xiàn)了煌集。
public abstract class BaseRichSpout extends BaseComponent implements IRichSpout {
@Override
public void close() {
}
@Override
public void activate() {
}
@Override
public void deactivate() {
}
@Override
public void ack(Object msgId) {
}
@Override
public void fail(Object msgId) {
}
}
總結(jié)
當(dāng)我們編寫Spout組件時(shí)妓肢,如果想要實(shí)現(xiàn)Spout給我們提供的所有方法,可以直接實(shí)現(xiàn)接口IRichSpout接口牙勘。如果我們只需要實(shí)現(xiàn)Spout組件所必須的方法职恳,可以直接繼承BaseRichSpout抽象類。
Bolt組件接口
所有的消息處理邏輯被封裝到Bolt中方面,比如可以用來做過濾放钦、聚合、查詢數(shù)據(jù)庫恭金、寫入數(shù)據(jù)等操禀。
Bolt也可以發(fā)送多條消息流Stream,使用OutputFieldsDeclarer.declareStream()
方法定義Stream横腿,然后使用OutputCollector.emit()
方法在發(fā)射消息時(shí)颓屑,指定Stream。
Bolt最重要的方法時(shí)execute()耿焊,它以接受一個(gè)tuple揪惦,經(jīng)過邏輯處理后,使用OutputCollector.emit()
發(fā)射出0個(gè)或多個(gè)tuple罗侯。Bolt還需要為每個(gè)tuple調(diào)用ack方法器腋,來通知Storm這個(gè)消息被該Bolt task執(zhí)行完成,從而通知這個(gè)tuple的發(fā)射者Spout(調(diào)用Spout的ack或fail)钩杰。
Bolt組件定義的邏輯和Spout組件的類似纫塌,下面是Bolt組件的實(shí)現(xiàn)方式。
IBolt接口
IBolt接口是Bolt組件的頂級(jí)接口讲弄,它定義了Bolt組件所需要的方法措左。IBolt的設(shè)計(jì)原則是以一個(gè)元組作為輸入,通過邏輯處理生成零個(gè)或多個(gè)元組輸出避除。
- prepare方法在拓?fù)渥鳂I(yè)在工作進(jìn)程初始化時(shí)調(diào)用怎披。和ISpout中的open方法一樣胸嘁,做一些組件初始化的工作。prepare方法同樣提供了三個(gè)stormConf钳枕、context和collector缴渊,用途和ISpout中的open方法一樣赏壹。需要注意Bolt中的collector的類型是OutputCollector鱼炒。
- execute方法是Bolt組件最重要的方法,接收上游發(fā)送的元組蝌借,執(zhí)行業(yè)務(wù)邏輯昔瞧,然后通過OutputCollector來向下游發(fā)射元組。在執(zhí)行完execute方法后菩佑,我們應(yīng)該調(diào)用ack或fail方法來通知Storm該消息已經(jīng)被處理(否則Storm一直會(huì)等到消息處理超時(shí)自晰,才認(rèn)為該消息處理失敗)。
- cleanup方法當(dāng)IBolt被即將關(guān)閉時(shí)調(diào)用稍坯,和ISpout的close方法一樣酬荞,不保證一定被執(zhí)行到。
execute中的tuple可以不立即執(zhí)行瞧哟,可以等待其它元組到來一起執(zhí)行混巧。比如做聚合、join等操作勤揩。
IRichBolt接口
IRichBolt接口實(shí)現(xiàn)了IBolt和IComponent接口咧党,它的作用和ISpout接口一樣,這里就不在復(fù)述了陨亡。
BaseRichBolt抽象方法
BaseRichBolt抽象類和BaseRichSpout抽象方法一樣傍衡,對(duì)一些我們可能用不到的方法,提供默認(rèn)實(shí)現(xiàn)负蠕。
public abstract class BaseRichBolt extends BaseComponent implements IRichBolt {
@Override
public void cleanup() {
}
}
IBasicBolt蛙埂、BaseBasicBolt
使用IBasicBolt和BaseBasicBolt類來實(shí)現(xiàn)Bolt組件,我們可以不需要手動(dòng)ack遮糖,IBasicBolt的execute方法會(huì)自動(dòng)執(zhí)行Acking機(jī)制(仔細(xì)看它使用的是BasicOutputCollector在emit后绣的,執(zhí)行ack方法)。如果我們希望該元組失敗止吁,可以顯示拋出一個(gè)FailedException
異常被辑。
IBasicBolt接口方法和IRichBolt具有相同的方法,只是prepare中沒有傳遞OutputCollector收集器了敬惦,而是在execute方法中直接傳遞了BasicOutputCollector盼理。所以如果我們不需要對(duì)該Bolt組件添加配置和獲取拓?fù)渖舷挛膶?duì)象,可以直接實(shí)現(xiàn)BaseBasicBolt抽象類俄删,因?yàn)樵擃愄峁┝藀repare和cleanup的默認(rèn)實(shí)現(xiàn)宏怔,我們只需要實(shí)現(xiàn)execute方法即可奏路。
總結(jié)
如果我們不想要手動(dòng)調(diào)用ack,可以繼承IBasicBolt或BaseBasickBolt來實(shí)現(xiàn)Bolt組件臊诊。當(dāng)然鸽粉,如果想要顯示靈活調(diào)用,可以通過繼承IRichBolt或BaseRichBolt來實(shí)現(xiàn)Bolt組件抓艳。
Stream Groupings
我們上面說過Topology是通過Stream Grouping將Spout和Bolt組合而成的触机。無論Bolt還是Spout我們都可以為其設(shè)置并行度,并行度對(duì)應(yīng)著task玷或,而Stream Grouping定義了該Bolt中的所有task以什么形式來接受數(shù)據(jù)流儡首。Stream Grouping支持的八種流分組方式在上面已經(jīng)說過了,這里就不說了偏友。
Topology配置
當(dāng)我編寫完Spout和Bolt組件后蔬胯,需要提供一個(gè)主類來設(shè)置拓?fù)洹_@個(gè)主類也是Storm執(zhí)行Topology任務(wù)的入口類位他。
并行度
在說Topology配置前氛濒,我們先理解一下Storm中的并行度。
在Storm中運(yùn)行Topology任務(wù)主要依賴下面三個(gè)實(shí)體:
- 工作進(jìn)程(Worker processes)
- 執(zhí)行線程(Executor)
- 任務(wù)(Task)
一個(gè)工作進(jìn)程運(yùn)行一個(gè)Topology的子集鹅髓,并且每個(gè)工作進(jìn)程只屬于一個(gè)Topology任務(wù)(不會(huì)存在一個(gè)Woker服務(wù)多個(gè)Topology)舞竿,一個(gè)Topology任務(wù)可能由多臺(tái)機(jī)器的多個(gè)Woker組成。
Woker進(jìn)程中可以啟動(dòng)多個(gè)Executor線程迈勋,每個(gè)Executor線程運(yùn)行一個(gè)或多個(gè)Task炬灭,但是這些task必須是同一Component中的。也就是每個(gè)Executor只能服務(wù)于一個(gè)Component靡菇。
Task是具體執(zhí)行數(shù)據(jù)處理的重归,我們實(shí)現(xiàn)的Bolt或Spout組件中每個(gè)組件可以啟動(dòng)一個(gè)或多個(gè)task來執(zhí)行,以達(dá)到提高處理效率的目的厦凤。Task數(shù)在Topology啟動(dòng)后就不能在改變了鼻吮,但是我們可以修改執(zhí)行Task的Executor線程數(shù),來動(dòng)態(tài)調(diào)整為該拓?fù)浞峙涞馁Y源较鼓。默認(rèn)情況下椎木,每個(gè)Executor會(huì)對(duì)應(yīng)一個(gè)task。
配置實(shí)例
我們首先需要使用TopologyBuilder來配置拓?fù)潢P(guān)系博烂,在設(shè)置過程中可以添加配置香椎、設(shè)置組件執(zhí)行Executor數(shù)量、設(shè)置組件task數(shù)以及設(shè)置Stream Grouping禽篱。
public static void main(String[] args) throws Exception{
//設(shè)置Component(bolt和spout)之間的拓?fù)浣Y(jié)構(gòu)
TopologyBuilder builder = new TopologyBuilder();
//添加spout組件
builder.setSpout("word-reader",new WordReaderSpout()).addConfiguration("topology.debug",true);
//添加bolt組件畜伐,并設(shè)置組件所需task數(shù),這里沒有設(shè)置Executor數(shù)量躺率,默認(rèn)會(huì)為每個(gè)task啟動(dòng)一個(gè)Executor線程玛界。這里還設(shè)置了以shuffleGrouping分組方式接收上游word-reader組件發(fā)送的消息
builder.setBolt("word-normalizer",new WordNormalizerBolt()).setNumTasks(2).shuffleGrouping("word-reader");
//設(shè)置bolt組件万矾,并且設(shè)置了啟動(dòng)Executor數(shù),這里沒有設(shè)置task數(shù)量慎框,默認(rèn)會(huì)為么給Executor分配一個(gè)task良狈。這里還設(shè)置了以shuffleGrouping分組方式接收上游word-normailizer組件發(fā)送的消息
builder.setBolt("word-counter",new WordCounterBolt()).shuffleGrouping("word-normalizer",3);
//運(yùn)行時(shí)與集群配置合并,并通過prepare或open方法發(fā)送給所有組件節(jié)點(diǎn)
Config conf = new Config();
//設(shè)置運(yùn)行該拓?fù)湫枰獛讉€(gè)Worker進(jìn)程笨枯,如果沒有設(shè)置默認(rèn)為1個(gè)薪丁。
conf.setNumWorkers(2);
conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS,10);
conf.put("word-file",args[0]);
conf.setDebug(false);
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING,1);
//使用集群方式提交拓?fù)鋱?zhí)行
StormSubmitter.submitTopology("storm-hello-word",conf,builder.createTopology());
}
如果集群中Woker數(shù)量被用完(storm.yaml中設(shè)置的supervisor.slots.ports),在提交新Topology時(shí)會(huì)失敗猎醇。需要等待運(yùn)行中的Topology釋放資源后才可以執(zhí)行窥突。
更新運(yùn)行中的Topology的Executor
Storm提供了對(duì)運(yùn)行中Topology任務(wù)的Executor熱更新,有兩種方式可以進(jìn)行更新硫嘶。
- 使用Storm web UI中rebalance更新。
- 是用Storm命令行工具CLI進(jìn)行更新梧税。
比如將myTopology任務(wù)改為5個(gè)工作進(jìn)程沦疾,組件blue-spout使用3個(gè)Executor,組件yello-bolt使用5個(gè)bolt第队。
storm rebalance myTopology -n 5 -e blue-spout=3 -e yello-bolt=5
Storm配置文件
Storm有許多各種各樣的配置哮塞,有些是系統(tǒng)配置不能通過Topology任務(wù)進(jìn)行修改,而有些配置是支持在Topology任務(wù)中進(jìn)行修改凳谦。
默認(rèn)Storm中的所有配置都在Storm代碼庫中的default.yaml中忆畅,我們可以通過在Nimbus節(jié)點(diǎn)或Supervisor節(jié)點(diǎn)中的storm.yaml進(jìn)行覆蓋。除了通過storm.yaml進(jìn)行覆蓋修改配置外尸执,我們還可以通過StormSubmitter構(gòu)建拓?fù)鋾r(shí)來修改配置(傳入的Config對(duì)象)家凯,但是這里只能修改以TOPOLOGY為前綴的配置項(xiàng)。
我們也可以通過Java API修改配置如失,有兩種方式:
- 內(nèi)部修改:覆蓋Spout或Bolt的getComponentConfiguration方法來修改配置绊诲。
- 外部修改:在TopologyBuilder中的setSpout或setBolt方法返回的對(duì)象調(diào)用addConfiguration來覆蓋配置。
這些配置的優(yōu)先級(jí)為:
default.yaml < storm.yaml < 配置拓?fù)涮砑优渲庙?xiàng) < 組件內(nèi)部添加配置項(xiàng) < 組件外部添加配置項(xiàng)
Maven開發(fā)配置
使用Maven開發(fā)Storm作業(yè)褪贵,首先就需要配置pom.xml文件掂之,包括Jar包引入、打包配置等脆丁。
Storm目前版本是1.2.2世舰,Jar包引入:
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.2.2</version>
<scope>provided</scope>
</dependency>
這里之所有使用provided模式是因?yàn)椋琒torm會(huì)自動(dòng)從工作節(jié)點(diǎn)下載Storm Jar包槽卫。
當(dāng)我們編寫完Storm作業(yè)后跟压,需要將相關(guān)依賴打到一個(gè)Jar包內(nèi)猫缭,可以使用assembly插件進(jìn)行打包濒蒋。
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.path.to.main.Class</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
其中mainClass就是我們作業(yè)的啟動(dòng)主類启盛。
本地模式
Storm為了方便開發(fā)測(cè)試凡桥,提供了本地模式運(yùn)行Storm Topology,本地模式模擬了集群模式下作業(yè)的執(zhí)行喷好,所以我們?cè)陂_發(fā)調(diào)試過程中可以使用本地模式測(cè)試開發(fā)的Storm作業(yè)翔横。
本地模式將Spout和Bolt都運(yùn)行在一個(gè)進(jìn)程上的多個(gè)線程執(zhí)行,來模擬真實(shí)的集群運(yùn)行情況梗搅。對(duì)于一些耗時(shí)操作禾唁,會(huì)采用Thread.sleep()方法模擬,所以有時(shí)會(huì)導(dǎo)致運(yùn)行速度緩慢无切。
//只需要?jiǎng)?chuàng)建LocalCluster對(duì)象類就可以使用本地模式
LocalCluster localCluster = new LocalCluster();
//提交作業(yè)
localCluster.submitTopology();
//停止作業(yè)
localCluster.killTopology();
//關(guān)閉本地集群模式
localCluster.shutdown();
Local模式提供了一下配置型:
Config.TOPOLOGY_MAX_TASK_PARALLELISM:設(shè)置Topology的最大并行度荡短。
Config.TOPOLOGY_DEBUG:開啟DEBUG模式,方便作業(yè)調(diào)試哆键。
Storm核心機(jī)制原理
Storm消息可靠性處理
Storm提供了幾種不同級(jí)別的消息可靠性處理:
- 盡力處理(best effort)
- 至少一次被處理(at least once)
- 恰好處理一次(exactly once,需要借助Trident)
盡力處理屬于最低級(jí)別保證機(jī)制掘托,我們可以不添加任何額外操作,Storm就能幫我們達(dá)到籍嘹。
至少一次被處理(at least once)
Spout發(fā)出的消息可能會(huì)產(chǎn)生成千上萬條消息(經(jīng)過各種Bolt task處理后就會(huì)分散出一條消息)闪盔,這些消息會(huì)組成一顆消息樹,其中Spout發(fā)出的消息為消息根辱士,Storm會(huì)跟蹤整棵樹的處理情況泪掀,如果這顆樹中的任一消息處理失敗,或者整棵樹在規(guī)定時(shí)間沒有被處理完(通過Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS
配置颂碘,默認(rèn)為30s)异赫,那么Storm就認(rèn)為Spout發(fā)出的這些消息處理失敗了,Spout會(huì)重新發(fā)送該條消息头岔。
判斷一個(gè)tuple tree是否被處理完成塔拳,有以下兩個(gè)條件:
- tuple tree不在生長(zhǎng)。
- tuple tree中的任何消息都被處理切油。
使用Storm API來實(shí)現(xiàn)
如果想要使用Storm提供的可靠性處理蝙斜,我們需要做兩件事:
- 無論何時(shí),只要在tuple tree中創(chuàng)建了一個(gè)新節(jié)點(diǎn)澎胡,就需要告知Storm孕荠。
- 當(dāng)處理完一個(gè)消息后,需要告知Storm中對(duì)應(yīng)tuple tree攻谁。
通過上面兩個(gè)步驟稚伍,Storm就可以檢測(cè)一個(gè)tuple tree是否被處理完成,并且會(huì)調(diào)用消息產(chǎn)生對(duì)應(yīng)Spout的ack和fail方法戚宦。
當(dāng)為tuple tree中指定的節(jié)點(diǎn)增加一個(gè)新節(jié)點(diǎn)時(shí)个曙,稱為錨定(anchoring)。錨定是在發(fā)送消息的同時(shí)進(jìn)行的,具體錨定方式為:把輸入消息作為emit方法的第一個(gè)參數(shù)垦搬。這樣就告知tuple tree呼寸,該節(jié)點(diǎn)產(chǎn)生了新節(jié)點(diǎn),只有當(dāng)新節(jié)點(diǎn)也完成時(shí)tuple tree才算執(zhí)行完成猴贰。
public class SplitSentence extends BaseRichBolt {
...
public void execute(Tuple tuple) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
_collector.emit(tuple, new Values(word));
}
_collector.ack(tuple);
}
...
Storm支持一個(gè)輸出消息被錨定在一個(gè)或多個(gè)輸入消息上对雪,比如join、聚合等場(chǎng)景米绕。一個(gè)被多重錨定的消息處理失敗瑟捣,會(huì)導(dǎo)致與之關(guān)聯(lián)的多個(gè)Spout重新發(fā)送消息。
List<Tuple> anchors = new ArrayList<Tuple>();
anchors.add(tuple1);
anchors.add(tuple2);
_collector.emit(anchors, new Values(1, 2, 3));
多重錨定會(huì)將錨定的消息添加到多棵tuple tree上栅干,并且有可能打破樹結(jié)構(gòu)迈套,從而形成一個(gè)DAG圖。
如果沒有錨定碱鳞,也就是沒有在emit方法的第一個(gè)參數(shù)指定輸入tuple桑李。那么這個(gè)節(jié)點(diǎn)所產(chǎn)生的子樹失敗,spout不會(huì)重新發(fā)送消息劫笙。該節(jié)點(diǎn)ack完成后芙扎,tuple tree就認(rèn)為被處理完成了。有些場(chǎng)景非常適合這種不需要錨定的消息填大。
完成錨定后,我們還需要在消息被處理完成后告知tuple tree俏橘。我們必須在每個(gè)execute方法的后面顯示調(diào)用OutputCollector的ack或fail方法允华,來表明該消息在該bolt是否被處理完成(否則會(huì)一直等到超時(shí))。
顯示調(diào)用ack或fail寥掐,是除了快速告知tuple tree消息是否被處理完成外靴寂,還有一個(gè)原因就是防止內(nèi)存被打滿。因?yàn)镾torm使用內(nèi)存來跟蹤每個(gè)元組是否被處理完成召耘,所以如果不調(diào)用ack或fail百炬,很容易將內(nèi)存打滿。
Storm提供了IBasicBolt和BaseBasicBolt接口來隱式調(diào)用ACK機(jī)制污它,也就是說我們?nèi)绻褂盟鼈儗?shí)現(xiàn)Bolt組件剖踊,就不需要手動(dòng)錨定和調(diào)用ack/fail方法了。
public class SplitSentence extends BaseBasicBolt {
public void execute(Tuple tuple, BasicOutputCollector collector) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
collector.emit(new Values(word));
}
}
針對(duì)于多重錨定衫贬,IBasicBolt和BaseBasicBolt是無法處理的德澈。需要我們顯示完成,也就是需要實(shí)現(xiàn)IRichBolt或BaseRichBolt定義Bolt組件固惯。
acker框架
Storm使用Acker框架來跟蹤消息是否被成功處理梆造。Acker是Storm中一組特殊的任務(wù),用于跟蹤每個(gè)Spout發(fā)送tuple的DAG葬毫。當(dāng)acker發(fā)現(xiàn)DAG中節(jié)點(diǎn)都完全被處理完成后镇辉,它會(huì)向創(chuàng)建該tuple的Spout發(fā)送一條消息(成功或失敗)屡穗。
我們可以使用Config.TOPOLOGY_ACKERS
在拓?fù)渑渲弥性O(shè)置Acker數(shù)量,默認(rèn)情況下每個(gè)Woker進(jìn)程會(huì)啟動(dòng)一個(gè)Acker任務(wù)忽肛。
當(dāng)在Topology中創(chuàng)建一個(gè)新的元組時(shí)村砂,會(huì)為每個(gè)元組分配一個(gè)64bit的隨機(jī)id(無論spout還是bolt組件)。Acker使用這些id來跟蹤tuple tree麻裁。每個(gè)tuple被創(chuàng)建后tuple tree中的根id都會(huì)被復(fù)制到這個(gè)消息中箍镜,當(dāng)這個(gè)消息處理完成后,它會(huì)根據(jù)根id來找到跟蹤這棵樹的Acker煎源,并向該Acker發(fā)送狀態(tài)變更信息色迂,比如:該元組已經(jīng)處理完成,又產(chǎn)生了新元組手销,需要你跟蹤下歇僧。
這里有個(gè)點(diǎn)需要考慮下,就是每個(gè)元組如何知道自己的tuple tree對(duì)應(yīng)著哪個(gè)Acker锋拖。Storm使用一種哈希算法根據(jù)Spout tupel id來確定那個(gè)Acker負(fù)責(zé)該tuple tree诈悍,而每個(gè)消息都知道根id,因此就知道與哪個(gè)Acker通信了兽埃。
我們知道了Acker與Spout tuple對(duì)應(yīng)關(guān)系侥钳,知道了每個(gè)tuple tree 元組如何找到對(duì)應(yīng)的Acker與其通信。接下來還需要考慮一點(diǎn)柄错,Acker如何跟蹤tuple tree舷夺。
我們知道每個(gè)tuple tree都有可能有成千上萬個(gè)節(jié)點(diǎn),如果跟蹤每個(gè)節(jié)點(diǎn)售貌,那么內(nèi)存很容易就被打滿了给猾。Storm采用了一個(gè)不同的跟蹤策略,每個(gè)Spout元組只需要固定數(shù)據(jù)量的空間(大約20字節(jié))颂跨,就可以跟蹤tuple tree敢伸。這種跟蹤算法是Storm能夠正常工作的關(guān)鍵,也是其重大突破之一恒削。
我們看下Storm是如何做的池颈。Acker為每個(gè)Spout元組存儲(chǔ)一個(gè)消息ID(隨機(jī)分配的那個(gè)ID)到一對(duì)值的映射,這對(duì)值的第一個(gè)元素就是Spout任務(wù)ID蔓同,第二個(gè)元素是64bit數(shù)字饶辙,稱為“ack val”,它是tuple tree中所有消息id的異或結(jié)果斑粱。ack val代表了整棵樹的狀態(tài)弃揽,當(dāng)這個(gè)ack val為0時(shí)就代表整棵樹已經(jīng)被處理完成了。
它的異或原理就是,當(dāng)我們無論創(chuàng)建一個(gè)節(jié)點(diǎn)還是完成一個(gè)節(jié)點(diǎn)都使用消息ID來與之異或矿微,這樣同一個(gè)消息ID一來一回異或結(jié)果就為0了痕慢。
如上圖,ack val最終值為T1T2T3T4T5T1T2T3T4^T5=0涌矢。
選擇合適的可靠性
Acker任務(wù)是輕量級(jí)的掖举,所以拓?fù)渲胁恍枰芏郃cker任務(wù),我們可以通過Storm UI來查看Acker吞吐量娜庇,如果吞吐量很差塔次,可以適當(dāng)添加Acker任務(wù)。
如果我們認(rèn)為消息可靠性不是必要的(處理失敗情況下丟失消息沒有關(guān)系)名秀,我們可以關(guān)閉消息可靠性励负。這樣拓?fù)湫阅芤矔?huì)提升,因?yàn)椴桓櫾M樹匕得,傳輸?shù)南?huì)減半(因?yàn)樵M樹中每個(gè)元組都需要發(fā)送一條確認(rèn)信息)继榆。并且每個(gè)下游元組中保留更少的數(shù)據(jù)(不需要存儲(chǔ)根ID),從而減少帶寬使用汁掠。
關(guān)閉消息可靠性的方式:
- 將Config.TOPOLOGY_ACKERS設(shè)置為0略吨,這樣Spout發(fā)送元組后,它的ack方法會(huì)被立即調(diào)用考阱。
- 在Spout中使用SpoutOutputCollector.emit發(fā)送消息時(shí)不指定消息ID翠忠。這樣可以對(duì)一些特定消息關(guān)閉消息可靠性。
- 如果不在意某個(gè)消息派生出子消息的可靠性乞榨,那么在對(duì)應(yīng)的botl組件中可以不進(jìn)行錨定(不指定輸入tuple)负间。
學(xué)習(xí)資料
- Storm官網(wǎng)(http://storm.apache.org/releases/1.2.2/index.html),看了許多Storm書籍姜凄,大部分都是直接翻譯的Storm官方文檔。
- 《從零開始學(xué)Storm》對(duì)Storm講解的非常全面趾访,由淺入深态秧,覆蓋面比較廣。
- 《Storm源碼分析》對(duì)Storm從源碼級(jí)別進(jìn)行了講解扼鞋,有些地方原理講的還是不錯(cuò)的申鱼,就是該書講解的Storm版本比較低。