Storm 是一個(gè)分布式的东且,可靠的埃跷,容錯(cuò)的數(shù)據(jù)流處理系統(tǒng)接癌。下面我將分別從storm的整體架構(gòu)以及部分原理進(jìn)行講解心赶。
一、基本的概念
storm中服務(wù)器節(jié)點(diǎn)分為主節(jié)點(diǎn)和從節(jié)點(diǎn),Nimbus為主節(jié)點(diǎn)和Supervisor為從節(jié)點(diǎn)缺猛。以及若干組件構(gòu)成缨叫。下面為對(duì)一些術(shù)語(yǔ)進(jìn)行簡(jiǎn)單的介紹:
Nimbus:主節(jié)點(diǎn),是一個(gè)調(diào)度中心荔燎,負(fù)責(zé)分發(fā)任務(wù)
Supervisor:從節(jié)點(diǎn)耻姥,任務(wù)執(zhí)行的地方
Worker:任務(wù)工作進(jìn)程,一個(gè)Supervisor中可以有多個(gè)Worker湖雹。
Executor:Worker進(jìn)程在執(zhí)行任務(wù)時(shí)咏闪,會(huì)啟動(dòng)多個(gè)Executor線程
Topology:任務(wù)的抽象概念。由于storm是流式計(jì)算的框架摔吏,它的數(shù)據(jù)流和拓?fù)鋱D很像鸽嫂,所以它的任務(wù)就叫topology。
Spout:從數(shù)據(jù)源獲取數(shù)據(jù)并進(jìn)行分發(fā)征讲。
Bolt:得到Spout或者上一個(gè)Bolt的數(shù)據(jù),然后進(jìn)行處理后交給下一個(gè)Bolt處理据某。
Tuple:在storm中,一條數(shù)據(jù)可以理解為是一個(gè)Tuple诗箍。
二癣籽、storm的架構(gòu)
任務(wù)提交處理流程
Nimbus是調(diào)度中心,Supervisor是任務(wù)執(zhí)行的地方滤祖。Supervisor上面有若干個(gè)Worker筷狼,每個(gè)Worker都有自己的端口,Worker可以理解為一個(gè)進(jìn)程。另外匠童,每個(gè)Worker中還可以運(yùn)行若干個(gè)線程埂材。
當(dāng)客戶端向storm集群提交一個(gè)Topology時(shí),這里的提交就是在集群上通過(guò)命令storm jar xxx
啟動(dòng)topology汤求。如果我們是在Supervisor節(jié)點(diǎn)上執(zhí)行storm jar xxx
俏险,那么Supervisor會(huì)將jar包拷貝到Nimbus,之后Nimbus對(duì)Topology進(jìn)行調(diào)度严拒。
Nimbus會(huì)根據(jù)Topology所需要的Worker進(jìn)行分配,將其分配到各個(gè)Supervisor的節(jié)點(diǎn)上執(zhí)行竖独。
現(xiàn)在假設(shè)我們我們有4個(gè)Supervisor節(jié)點(diǎn)裤唠,每個(gè)Supervisor都配置4個(gè)Worker。這是我們提交了一個(gè)Topology莹痢,需要4個(gè)Worker,那可能的分配情況可能如下圖所示:
storm中的數(shù)據(jù)流
啟動(dòng)完Topology后,相關(guān)組件就開(kāi)始運(yùn)行起來(lái)了种蘸。在Storm中,Spout組件主要用來(lái)從數(shù)據(jù)源拉取數(shù)據(jù),形成一個(gè)Tuple后轉(zhuǎn)交給Bolt處理格二。Bolt接受到Tuple處理完后,可以選擇繼續(xù)交給下一個(gè)Bolt處理劈彪,也可以選擇不往下傳竣蹦。這樣數(shù)據(jù)以Tuple的形式一個(gè)接一個(gè)的往下執(zhí)行,就形成了一個(gè)拓?fù)鋽?shù)據(jù)流顶猜。
storm數(shù)據(jù)在組件間的流向如下圖所示:
三、Storm的并發(fā)度
在Storm中痘括,Worker不是組件執(zhí)行的最小單位长窄。Executor才是,Executor可以理解為是一個(gè)線程纲菌。我們?cè)趧?chuàng)建topology的時(shí)候挠日,可以設(shè)置執(zhí)行spout的線程數(shù)和bolt的線程數(shù)。
假設(shè)spout和bolt的線程數(shù)加起來(lái)設(shè)置了8個(gè)翰舌,然后設(shè)置了2個(gè)worker嚣潜,那么這8個(gè)線程可能就會(huì)隨機(jī)分配到2個(gè)worker中,可能一個(gè)worker3個(gè)椅贱,一個(gè)worker5個(gè)懂算。也有可能各自分配4個(gè)。如下圖所示:
四庇麦、數(shù)據(jù)的Grouping策略
在實(shí)際應(yīng)用中,Bolt組件的實(shí)例可能有多個(gè),Tuple在流向Bolt時(shí)计技,選擇哪個(gè)Bolt實(shí)例的策略就是grouping策略。
下面是Storm中的6種Grouping策略:
- Shuffle Grouping: 隨機(jī)分組山橄, 隨機(jī)派發(fā)stream里面的tuple垮媒, 保證每個(gè)bolt接收到的tuple數(shù)目相同。輪詢航棱,平均分配睡雇。
- Fields Grouping:按字段分組, 比如按userid來(lái)分組饮醇, 具有同樣userid的tuple會(huì)被分到相同的Bolts它抱, 而不同的userid則會(huì)被分配到不同的Bolts。
- All Grouping: 廣播發(fā)送驳阎, 對(duì)于每一個(gè)tuple抗愁, 所有的Bolts都會(huì)收到馁蒂。
- Global Grouping: 全局分組, 這個(gè)tuple被分配到storm中的一個(gè)bolt的其中一個(gè)task蜘腌。再具體一點(diǎn)就是分配給id值最低的那個(gè)task沫屡。
- Non Grouping: 不分組, 這個(gè)分組的意思是說(shuō)stream不關(guān)心到底誰(shuí)會(huì)收到它的tuple撮珠。目前這種分組和Shuffle grouping是一樣的效果沮脖,不平均分配。
- Direct Grouping: 直接分組芯急, 這是一種比較特別的分組方法勺届,用這種分組意味著消息的發(fā)送者舉鼎由消息接收者的哪個(gè)task處理這個(gè)消息。 只有被聲明為Direct Stream的消息流可以聲明這種分組方法娶耍。而且這種消息tuple必須使用emitDirect方法來(lái)發(fā)射免姿。消息處理者可以通過(guò)TopologyContext來(lái)或者處理它的消息的taskid(OutputCollector.emit方法也會(huì)返回taskid)
五、消息的可靠性保證 —— ack機(jī)制
一條數(shù)據(jù)在Spout中形成一個(gè)Tuple榕酒,然后交給一個(gè)個(gè)Bolt執(zhí)行,那我們?cè)趺幢WC這個(gè)Tuple被完整的執(zhí)行了呢胚膊?這里的完整執(zhí)行說(shuō)的是這個(gè)Tuple必須在后面的每一個(gè)Bolt都成功處理,假設(shè)在一個(gè)Bolt中發(fā)生異常導(dǎo)致失敗想鹰,這就不能算完整處理紊婉。
為了保證消息處理過(guò)程中的可靠性,storm使用了ack機(jī)制。storm會(huì)專門啟動(dòng)若干acker線程辑舷,來(lái)追蹤tuple的處理過(guò)程喻犁。acker線程數(shù)量可以設(shè)置。
每一個(gè)Tuple在Spout中生成的時(shí)候,都會(huì)分配到一個(gè)64位的messageId何缓。通過(guò)對(duì)messageId進(jìn)行哈希我們可以執(zhí)行要對(duì)哪個(gè)acker線程發(fā)送消息來(lái)通知它監(jiān)聽(tīng)這個(gè)Tuple肢础。
acker線程收到消息后,會(huì)將發(fā)出消息的Spout和那個(gè)messageId綁定起來(lái)。然后開(kāi)始跟蹤該tuple的處理流程歌殃。如果這個(gè)tuple全部都處理完乔妈,那么acker線程就會(huì)調(diào)用發(fā)起這個(gè)tuple的那個(gè)spout實(shí)例的ack()方法。如果超過(guò)一定時(shí)間這個(gè)tuple還沒(méi)處理完氓皱,那么acker線程就會(huì)調(diào)用對(duì)應(yīng)spout的fail()方法,通知spout消息處理失敗路召。spout組件就可以重新發(fā)送這個(gè)tuple。
從上面的介紹我們知道了波材,tuple數(shù)據(jù)的流向會(huì)形成一個(gè)拓?fù)鋱D股淡,也可以理解成是一個(gè)tuple樹(shù)。這個(gè)拓?fù)鋱D的節(jié)點(diǎn)可能會(huì)有很多個(gè)廷区,如果要把這些節(jié)點(diǎn)全部保存起來(lái)唯灵,處理大量的數(shù)據(jù)時(shí)勢(shì)必會(huì)造成內(nèi)存溢出。
對(duì)于這個(gè)難題隙轻,storm使用了一種非常巧妙的方法埠帕,使用20個(gè)字節(jié)就可以追蹤一個(gè)tuple是否被完整的執(zhí)行垢揩。這也是storm的一個(gè)突破性的技術(shù)。
ack機(jī)制的具體原理
我們都知道,自己異或自己,結(jié)果肯定為零( a ^ a = 0)敛瓷。ack中就利用這個(gè)特性
- acker對(duì)于每個(gè)spout-tuple保存一個(gè)ack-val的校驗(yàn)值叁巨,它的初始值是0, 然后每發(fā)射一個(gè)tuple/ack一個(gè)tuple呐籽,那么tuple的id都要跟這個(gè)校驗(yàn)值異或一下锋勺。注意,這里的tuple的id不是spout-tuple的id,和我們上面理解的messageId不是一個(gè)概念狡蝶,要區(qū)分一下,是每個(gè)新生產(chǎn)的tuple的id庶橱,這個(gè)tupleId是隨機(jī)生成的64位比特值
- 之后把得到的值更新為ack-val的新值。那么假設(shè)每個(gè)發(fā)射出去的tuple都被ack了贪惹, 那么最后ack-val一定是0(因?yàn)橐粋€(gè)數(shù)字跟自己異或得到的值是0)苏章。
舉個(gè)例子,比如發(fā)射了某個(gè)tuple,就 ack-val ^ tupleId馍乙,然后ack了某個(gè)tuple,就再ack-val ^ tupleId布近,這樣,ack-val 最終又變成了0丝格,說(shuō)明tuple已經(jīng)全部處理成功了。
六棵譬、Storm的HA保證——高可用性保證
1. 數(shù)據(jù)方面的高可用
使用ack機(jī)制保證數(shù)據(jù)處理的高可用
2. Worker進(jìn)程掛了怎么辦显蝌?
Supervisor會(huì)自動(dòng)重啟worker線程。
3. Supervisor節(jié)點(diǎn)失效了怎么辦订咸?
可以在其他節(jié)點(diǎn)重啟該supervisor任務(wù)曼尊。
4. Nimbus掛了怎么辦?
在storm1.0之前,Nimbus是不支持HA的脏嚷。Nimbus如果掛了骆撇,重啟Nimbus進(jìn)程就可以了,不會(huì)影響到現(xiàn)有topology的運(yùn)行父叙。
因?yàn)镹imbus只是一個(gè)調(diào)度中心神郊,Nimbus和Supervisor的狀態(tài)都保存在本地文件和ZooKeeper,因此他們進(jìn)程可以隨便殺死趾唱,然后重啟涌乳,不會(huì)影響到Worker進(jìn)程的運(yùn)行。
另外甜癞,Nimbus的作用在就是在拓?fù)淙蝿?wù)開(kāi)始階段夕晓,負(fù)責(zé)將任務(wù)提交到集群,后期負(fù)責(zé)拓?fù)淙蝿?wù)的管理悠咱,比如任務(wù)查看蒸辆,終止等操作征炼。在通常情況下,nimbus的任務(wù)壓力并不會(huì)很大躬贡,在自然情況下不會(huì)出現(xiàn)宕機(jī)的情況柒室。
storm1.0后Nimbus的HA策略還沒(méi)有具體研究過(guò),有興趣的小伙伴可自行前往官網(wǎng)查看文檔逗宜。http://storm.apache.org/releases/1.2.1/nimbus-ha-design.html
七雄右、總結(jié)
Storm的架構(gòu)及原理整體理解起來(lái)不算很難,但很多細(xì)節(jié)還是需要在實(shí)踐中才能發(fā)現(xiàn)纺讲。有興趣的小伙伴可以去讀讀storm的源碼擂仍,storm源碼大多數(shù)都是用Clojure實(shí)現(xiàn),對(duì)Clojure語(yǔ)言不熟悉的朋友可以去看一下JStorm的源碼實(shí)現(xiàn)熬甚。這是阿里基于Storm用java實(shí)現(xiàn)的框架逢渔,據(jù)說(shuō)更加穩(wěn)定高效。
最后乡括,哪里有說(shuō)的不對(duì)的地方肃廓。敬請(qǐng)支出,感激不盡诲泌。