本文借鑒官文晓猛,添加了一些解釋和看法,其中有些理解凡辱,寫的比較粗糙戒职,有問題的地方希望大家指出。寫這篇文章透乾,是想把一些官文和資料中基礎(chǔ)洪燥、重點(diǎn)拿出來,能總結(jié)出便于大家理解的話語乳乌。與大多數(shù)“wordcount”代碼不同的是捧韵,并不會有如何運(yùn)行第一storm代碼等內(nèi)容,只有在運(yùn)行完代碼后汉操,發(fā)現(xiàn)需要明白:“知其然再来,并知其所以然”。
Storm是什么?為什么要用Storm芒篷?為什么不用Spark搜变?
第一個問題,以下概念足以解釋:
Storm是基于數(shù)據(jù)流的實(shí)時處理系統(tǒng)针炉,提供了大吞吐量的實(shí)時計(jì)算能力挠他。通過數(shù)據(jù)入口獲取每條到來的數(shù)據(jù),在一條數(shù)據(jù)到達(dá)系統(tǒng)的時候篡帕,立即會在內(nèi)存中進(jìn)行相應(yīng)的計(jì)算绩社;Storm適合要求實(shí)時性較高的數(shù)據(jù)分析場景。
第二問題:
很多場景下赂苗,我們希望系統(tǒng)能夠?qū)崟r的處理一條數(shù)據(jù)、甚至是事務(wù)贮尉。也就是說拌滋,在處理數(shù)據(jù)、事務(wù)的過程中猜谚,到達(dá)系統(tǒng)败砂,并能馬上得到結(jié)果。其次魏铅,在成萬上億條數(shù)據(jù)大量涌入系統(tǒng)時昌犹,也要求“實(shí)時”的到事務(wù)處理的結(jié)果。此時览芳,單個節(jié)點(diǎn)已經(jīng)是杯水車薪了斜姥,而Storm的關(guān)鍵一項(xiàng)是因?yàn)樗С址植际讲⑿杏?jì)算!如果說沧竟,你遇到了以上相似的場景铸敏,那Storm可以當(dāng)仁不讓的扛起實(shí)時處理的大旗!
第三個問題:
這個問題其實(shí)很難界定悟泵,因?yàn)镾park在RDD粒度上杈笔,可以滿足實(shí)時計(jì)算的要求,當(dāng)然糕非,使用RDD還有其他優(yōu)勢蒙具;但總的來說,Storm 的實(shí)時性更強(qiáng)朽肥。其次禁筏,Storm的框架完全按照流式處理的思想構(gòu)建,和項(xiàng)目場景結(jié)合性更強(qiáng)一些鞠呈。(Spark 用的不是很多融师,歡迎吐槽。)
進(jìn)入正題,
在看Storm之前蚁吝,很多人都對Hadoop有一定了解旱爆,為了能更快入戲舀射,我們以Hadoop為參照,以下是它使用yarn之前的架構(gòu)怀伦,對照Storm Server框架理解脆烟。
Hadoop、Storm系統(tǒng)和組件接口對比表:
Storm框架:
上面這幅圖是Stom框架圖房待,和很多分布式系統(tǒng)一樣邢羔,基于zk作為集群配置運(yùn)行的元數(shù)據(jù)基礎(chǔ)平臺。
nimbus和supervisor是服務(wù)器端守護(hù)進(jìn)程桑孩,守護(hù)進(jìn)程的文章會在Storm概念拜鹤、原理詳解及其應(yīng)用(二)Storm Cluster。
以下是對啟動一個應(yīng)用所需要的集群上JVM進(jìn)程線程的簡單介紹流椒,建議記憶后再繼續(xù)閱讀敏簿。
· Nodes (服務(wù)器):指配置在一個 Storm 集群中的服務(wù)器,會執(zhí)行 topology 的一部分
運(yùn)算宣虾。一個 Storm 集群可以包括一個或者多個工作 node 惯裕。
· Workers (JVM 虛擬機(jī)):指一個 node 上相互獨(dú)立運(yùn)行的 JVM 進(jìn)程。每個 node 可
以配置運(yùn)行一個或者多個 worker 绣硝。一個topology 會分配到一個或者多個 worker 上
運(yùn)行蜻势。
· Executor (線程):指一個 worker 的jvm 進(jìn)程中運(yùn)行的 Java 線程。多個 task 可以
指派給同一個 executer 來執(zhí)行鹉胖。除非是明確指定握玛, Storm 默認(rèn)會給每個 executor 分
配一個 task。
· Task (bolt/spout 實(shí)例): task 是 spout 和bolt 的實(shí)例次员, 它們的 nextTuple() 和
execute() 方法會被executors 線程調(diào)用執(zhí)行败许。
例如:
builder.setSpout(spoutName, spout, spoutParallelism).setNumTasks(2)
這里可以定義spoutParallelism = 2,即對應(yīng)兩個executor線程淑蔚,tasks為兩個實(shí)例市殷。
(此處配置的原理,會在接下來會講到worker和并發(fā)中解釋刹衫。)
可以看出醋寝,雖然在這設(shè)置了多個task實(shí)例,但是并行度并沒有提高(而executor在不同的worker上執(zhí)行带迟,存在并行)音羞,因?yàn)橹挥袃蓚€線程去運(yùn)行這些實(shí)例,只有設(shè)置足夠多的線程和實(shí)例才可以真正的提高并行度仓犬;在這設(shè)置多個實(shí)例主要是為了下面執(zhí)行rebalance的時候用到嗅绰。
為什么要用rebalance?
這里一直在啟動、操作的是“線程”窘面,真正的process需要在配置中設(shè)置worker數(shù)量翠语,也就是說topology啟動時已經(jīng)決定了worker數(shù)量(即并行數(shù)量)。因?yàn)閞ebalance不需要修改代碼财边,就可以動態(tài)修改topology的并行度肌括,這樣的話就必須提前配置好多個實(shí)例,在rebalance的時候主要是對之前設(shè)置多余的任務(wù)實(shí)例分配線程去執(zhí)行酣难。
在命令行動態(tài)修改并行度:
除了使用代碼進(jìn)行調(diào)整谍夭,還可以在shell命令行下對并行度進(jìn)行調(diào)整。
storm rebalance mytopology -w 10 -n 2 -e spout=2 -e bolt=2
表示 10秒之后對mytopology進(jìn)行并行度調(diào)整憨募。把spout調(diào)整為2個executor紧索,把bolt調(diào)整為2個executor
注意:并行度主要就是調(diào)整executor的數(shù)量,但是調(diào)整之后的executor的數(shù)量必須小于等于task的數(shù)量菜谣,如果分配的executor的線程數(shù)比task數(shù)量多的話也只能分配和task數(shù)量相等的executor齐板。
概念:
官方對于Storm下名詞概念的解釋如下:
1、Topologies
2葛菇、Streams
3、Spouts
4橡羞、Bolts
5眯停、Stream groupings
6、Reliability
7卿泽、Tasks
8莺债、Workers
1、Topologies(拓?fù)洌?/p>
Topology是Storm中實(shí)時應(yīng)用的一種封裝签夭。其功能 analogous to a MapReducejob齐邦,但唯一不同的是它是循環(huán)執(zhí)行的——無數(shù)據(jù)流等待,有數(shù)據(jù)流執(zhí)行第租,直到被kill progress措拇。
一個Topology是spouts和bolts組成并被Stream groupings連接的一副流程圖,相關(guān)概念如下:
Resources:
- TopologyBuilder: use this class to construct topologies in Java:在java中慎宾,該類構(gòu)建了topologies丐吓。
- Running topologies on a production cluster:在生產(chǎn)集群中,運(yùn)行多個topologies趟据。
- Local mode: Read this to learn how to develop and test topologies in local mode. 在本地模型中開發(fā)和測試topologies券犁。
Topology結(jié)構(gòu):
[圖片上傳中...(image.png-5e2118-1532299869992-0)]
2、Streams (流)
Stream在Storm中是一個核心的抽象概念汹碱。一個流是由無數(shù)個元組序列構(gòu)成粘衬,這些元組并行、分布式的被創(chuàng)建和執(zhí)行。在stream的許多元組中稚新,Streams被定義為以Fields區(qū)域命名的一種模式勘伺。默認(rèn)情況下,元組支持:integers, longs, shorts, bytes, strings, doubles, floats, booleans, and byte arrays. 你也可以定義自己的序列化器枷莉,使這種風(fēng)格類型能夠被自然的使用在元組中娇昙。
每一個Stream在聲明的時候都會賦予一個id。單個Stream——spouts和bolts笤妙,可以使用OutputFieldsDeclarer 的convenience方法聲明一個stream冒掌,而不用指定一個id。但是這種方法會給予一個默認(rèn)的id——default蹲盘,相關(guān)概念如下:
Resources:
- Tuple: streams are composed of tuples:Tuple是一個interface股毫,對應(yīng)的實(shí)現(xiàn)類 TupleImpl。
- OutputFieldsDeclarer: used to declare streams and their schemas
- Serialization: Information about Storm's dynamic typing of tuples and declaring custom serializations
Ps:Storm中的tuple是接口召衔,沒有具體實(shí)現(xiàn)铃诬,但原話這么解釋的:
Storm needs to know how to serialize all the values in a tuple. By default, Storm ** knows how to serialize the primitive types, strings, and byte arrays.*
3、Spouts
在Topology中苍凛,每個Spout都是一個Streams源趣席,通常情況下,Spouts會從外部源讀取Tuple醇蝴,并輸入這些Tuple到Topology中宣肚。
Spouts既是可靠的又是不可靠的,因?yàn)橛扑ǎ煽康膕pout會在發(fā)送Tuple失敗的情況下霉涨,重復(fù)發(fā)送;相反惭适,不可靠的spout會忘記它發(fā)送過的Tuple笙瑟,無論是否成功。
Spout代碼過程:
Spouts能夠發(fā)送多個流:使用OutputFieldsDeclarer(interface)的declareStream
方法聲明多個流癞志,并且當(dāng)使用SpoutOutputCollector(實(shí)現(xiàn)2往枷,接口模式)的emit方法可以指定這個流去發(fā)送Tuple。
Spouts的主要方法之一是:nextTuple() 發(fā)送tuple凄杯,nextTuple可以發(fā)送一個新的Tuple到Topology师溅,或者當(dāng)沒有新的Tuple被發(fā)送的時候,就簡單的返回盾舌。對于任何spout的實(shí)現(xiàn)墓臭,nextTuple都不能阻塞,因?yàn)镾torm調(diào)用的所有spout都是基于同一個線程妖谴!
其次是 ack 和 fail 方法窿锉,它們都會被調(diào)用酌摇,當(dāng)Storm發(fā)現(xiàn)一個tuple被從spout發(fā)射后,要么成功地完成的通過topology嗡载,要么錯誤的完成窑多。ack 和 fail 方法只有在可靠的spouts下才能被調(diào)用。spout可靠性洼滚,請搜本頁下面內(nèi)容埂息,或移至代碼。
Resources:
- IRichSpout: this is the interface that spouts must implement.
- Guaranteeing message processing
Ps:nextTuple()方法中會發(fā)送Tuple遥巴,至于那種對象能發(fā)送千康,請看上述。
Qu:1铲掐、在代碼中如何讓聲明的留和發(fā)送tuple聯(lián)系起來拾弃,因?yàn)槁暶髁鞯拿Q并不是tuple對象名?
2摆霉、是Storm中Spout的nextTuple對應(yīng)一個線程豪椿,還是多個Spout對應(yīng)一個線程?
answer:在集群中携栋,應(yīng)該是每個node的JVM中啟動一個線程跑spout
4搭盾、Bolts
在Topologies中所有的處理都會在bolts中被執(zhí)行,它能夠過濾tuple婉支、函數(shù)操作增蹭、合并(連接join、聚合aggregation)磅摹、數(shù)據(jù)庫讀寫等。Bolt可以做復(fù)雜的流傳輸霎奢,需要多步驟户誓、多bolt的連接。
Bolt也可以發(fā)射出一個或多個流幕侠,它需要使用OutputFieldsDeclarer 類的 declareStream 方法
聲明多個流帝美,并且需要指定這個流去使用OutputCollectorl類的emit方法去
發(fā)射。
當(dāng)你聲明一個bolt的輸入流時晤硕,你需要訂閱一個指定的其他組件的流悼潭。每一個流的訂閱都是一個個添加。InputDeclarer類可以聲明一個流在默認(rèn)的流id上舞箍。 declarer.shuffleGrouping("1") 說明在組件“1”上訂閱了這個默認(rèn)流舰褪,等價于``declarer.shuffleGrouping("1", DEFAULT_STREAM_ID)。
Bolts的主要方法是execute
方法疏橄,它會吸收作為輸入的一個新Tuple占拍。Bolts使用 OutputCollector 對象發(fā)射新的Tuples略就。Bolts必須對每一個tuple調(diào)用OutputCollector
的ack
方法,以便于Storm知道什么時候元組們被處理完成(可以最終確定它的安全對于包裝這個初始化spout tuples)晃酒。 共同處理一個輸入元組的情況下,發(fā)射0或多個元組們基于元組表牢,然后包裝輸入元組,Storm提供一個IBasicBolt接口的自動包裝贝次。
在Bolts異步處理的時候崔兴,完全可以啟動新線程;同時OutputCollector是線程安全的蛔翅,可以在任何時候被調(diào)用敲茄。
Resources:
- IRichBolt: this is general interface for bolts.
- IBasicBolt: this is a convenience interface for defining bolts that do filtering or simple functions.
- OutputCollector: bolts emit tuples to their output streams using an instance of this class
- Guaranteeing message processing
Ps:bolt發(fā)送或接收的數(shù)據(jù)流都可以多對多的進(jìn)行。
[圖片上傳中...(image.png-2bf086-1532299931228-0)]
5搁宾、Stream groupings 流分組
定義一個拓?fù)洳糠质侵付嗣總€bolt門閂的流都應(yīng)該作為輸入被接收折汞。一個流分組定義為:在門閂的任務(wù)之中如何區(qū)分流。
在Storm中有8種流分組方式盖腿,通過實(shí)現(xiàn)CustomStreamGroupingj接口爽待,你可以實(shí)現(xiàn)一種風(fēng)格流分組方式:
Storm 定義了八種內(nèi)置數(shù)據(jù)流分組的方式:
1、Shuffle grouping(隨機(jī)分組):這種方式會隨機(jī)分發(fā) tuple 給bolt 的各個 task翩腐,每個bolt 實(shí)例接收到的相同數(shù)量的 tuple 鸟款。
2、Fields grouping(按字段分組):根據(jù)指定字段的值進(jìn)行分組茂卦。比如說何什,一個數(shù)據(jù)流根據(jù)“ word”字段進(jìn)行分組,所有具有相同“ word ”字段值的 tuple 會路由到同一個 bolt 的 task 中等龙。
3处渣、All grouping(全復(fù)制分組):將所有的 tuple 復(fù)制后分發(fā)給所有 bolt task北戏。每個訂閱數(shù)據(jù)流的 task 都會接收到 tuple 的拷貝克滴。
4、Globle grouping(全局分組):這種分組方式將所有的 tuples 路由到唯一一個 task 上珠十。Storm 按照最小的 task ID 來選取接收數(shù)據(jù)的 task 泥畅。注意荠诬,當(dāng)使用全局分組方式時,設(shè)置 bolt 的 task 并發(fā)度是沒有意義的(spout并發(fā)有意義)位仁,因?yàn)樗?tuple 都轉(zhuǎn)發(fā)到同一個 task 上了柑贞。使用全局分組的時候需要注意,因?yàn)樗械?tuple 都轉(zhuǎn)發(fā)到一個 JVM 實(shí)例上聂抢,可能會引起 Storm 集群中某個 JVM 或者服務(wù)器出現(xiàn)性能瓶頸或崩潰钧嘶。
5、None grouping(不分組):在功能上和隨機(jī)分組相同琳疏,是為將來預(yù)留的康辑。
6摄欲、Direct grouping(指向型分組):數(shù)據(jù)源會調(diào)用 emitDirect() 方法來判斷一個 tuple 應(yīng)該由哪個 Storm 組件來接收。只能在聲明了是指向型的數(shù)據(jù)流上使用疮薇。
7胸墙、Local or shuffle grouping (本地或隨機(jī)分組):和隨機(jī)分組類似,但是按咒,會將 tuple 分發(fā)給同一個 worker 內(nèi)的bolt task (如果 worker 內(nèi)有接收數(shù)據(jù)的 bolt task )迟隅。其他情況下,采用隨機(jī)分組的方式励七。取決于topology 的并發(fā)度智袭,本地或隨機(jī)分組可以減少網(wǎng)絡(luò)傳輸,從而提高 topology 性能掠抬。
8吼野、Partial Key grouping: The stream is partitioned by the fields specified in the grouping, like the Fields grouping, but are load balanced between two downstream bolts, which provides better utilization of resources when the incoming data is skewed. This paper provides a good explanation of how it works and the advantages it provides.
Resources:
- TopologyBuilder: use this class to define topologies
-
InputDeclarer: this object is returned whenever
setBolt
is called onTopologyBuilder
and is used for declaring a bolt's input streams and how those streams should be grouped
6、Reliability
Storm保證每一個spout tuple都將會在拓?fù)渲型暾谋惶幚砹讲āL幚磉^程:它會追蹤這個tuple tree被每一個spout tuple所觸發(fā)瞳步,并且確定tuple tree已經(jīng)成功完成。每個拓?fù)涠加幸粋€“信息超時”與之相關(guān)聯(lián)腰奋。假如Storm未能檢測到一個spout tuple已經(jīng)超時完成单起,它將舍棄并重新執(zhí)行這個tuple。
為了改善Storm的可靠性能力劣坊,你可以告訴Storm什么時候需要在元組樹種創(chuàng)建一個新的邊界嘀倒,告訴Storm無論在什么時候都可以完成處理一個獨(dú)立的tuple。Bolt們都使用了OutputCollector對象去發(fā)射tuple局冰〔饽ⅲ“錨定”(實(shí)際上就是mark)的完成于這個emit方法,你可以聲明一個元組使用了ack方法而被完成康二。
以上詳細(xì)的解釋了可靠消息處理碳胳。
7、Tasks
每個噴口spout或者門閂bolt都有許多任務(wù)在集群中執(zhí)行赠摇。每一個任務(wù)對應(yīng)一個執(zhí)行線程,流分組定義了如何從一個任務(wù)集到另外一個任務(wù)集發(fā)送元組浅蚪。你可以使用TopologyBuilder 類的setSpout和setBolt方法藕帜,為每一個spout或bolt是設(shè)置并行度和并發(fā)度。
Ps:Tasks可以理解為每個節(jié)點(diǎn)上的任務(wù)實(shí)例惜傲,運(yùn)行在對應(yīng)executor線程上洽故。
8、Workers
拓?fù)鋱?zhí)行要通過一個或多個worker進(jìn)程盗誊。每一個worker進(jìn)程都是一個物理的JVM和這個拓?fù)渲袌?zhí)行了一個所有這個任務(wù)的子集时甚。
例子:如果拓?fù)涞穆?lián)合并發(fā)數(shù)為300隘弊,分配了50個worker,因此每一個worker將會執(zhí)行6個task(task將作為worker的線程)荒适。Storm將會均勻的分配任務(wù)到所有worker上梨熙。
Resources:
- Config.TOPOLOGY_WORKERS: this config sets the number of workers to allocate for executing the topology
Worker結(jié)構(gòu):
Topology的并發(fā)機(jī)制:
storm的Worker、Executor刀诬、Task默認(rèn)配置都是1
1咽扇、增加worker(本地模式無效,只有一個JVM)
Config對象的setNumWorkers()方法:
Config config = new Config();
config.setNumWorkers(2):
2陕壹、配置executor 和 task
默認(rèn)都為1质欲,setXXX指定一個Worker中有幾個線程,而后面的setNumXXX指定總共需要執(zhí)行的tasks數(shù)量糠馆,因此嘶伟,一個Thread--Executor中需要跑tasks/threads個任務(wù)。
topologyBuilder.setSpout(SENTENCE_SPOUT_ID, spout, 2);
// StormBaseSpout -> StormBaseBolt
topologyBuilder.setBolt(SPLIT_BOLT_ID, bolt).setNumTasks(2).shuffleGrouping(SENTENCE_SPOUT_ID);
// StormBaseBolt -> StormBaseBoltSecond
topologyBuilder.setBolt(COUNT_BOLT_ID, boltSecond, 4).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
// StormBaseBoltSecond -> StormBaseBoltThird
topologyBuilder.setBolt(REPORT_BOLT_ID, boltThird).globalGrouping(COUNT_BOLT_ID);
storm的處理保障機(jī)制:
1又碌、spout的可靠性
spout會記錄它所發(fā)射出去的tuple九昧,當(dāng)下游任意一個bolt處理失敗時spout能夠重新發(fā)射該tuple。在spout的nextTuple()發(fā)送一個tuple時赠橙,為實(shí)現(xiàn)可靠消息處理需要給每個spout發(fā)出的tuple帶上唯一ID耽装,并將該ID作為參數(shù)傳遞給SpoutOutputCollector的emit()方法:collector.emit(new Values("value1","value2"), tupleID);
實(shí)際上Values extends ArrayList<Object>
保障過程中,每個bolt每收到一個tuple期揪,都要向上游應(yīng)答或報(bào)錯掉奄,在tuple樹上的所有bolt都確認(rèn)應(yīng)答,spout才會隱式調(diào)用ack()方法表明這條消息(一條完整的流)已經(jīng)處理完畢凤薛,將會對編號ID的消息應(yīng)答確認(rèn)姓建;處理報(bào)錯、超時則會調(diào)用fail()方法缤苫。
2速兔、bolt的可靠性
bolt的可靠消息處理機(jī)制包含兩個步驟:
a、當(dāng)發(fā)射衍生的tuple活玲,需要錨定讀入的tuple
b涣狗、當(dāng)處理消息時,需要應(yīng)答或報(bào)錯
可以通過OutputCollector中emit()的一個重載函數(shù)錨定或tuple:collector.emit(tuple, new Values(word)); 并且需要調(diào)用一次this.collector.ack(tuple)應(yīng)答舒憾。
以上就是storm的基礎(chǔ)概念镀钓,閱讀完后并不能滿足你去實(shí)現(xiàn)代碼的需求,因?yàn)樾枰粋€可demo代碼镀迂,作為模仿的基礎(chǔ)丁溅。這里就不做提供了,畢竟網(wǎng)上一大堆探遵。
最近在研究Storm源代碼窟赏,不想與“源碼分析”一樣只告訴該類代碼:結(jié)構(gòu)妓柜、方式、用到了什么技術(shù)涯穷,而希望寫一些“特殊”的內(nèi)容棍掐;當(dāng)然有可能也不能免俗,但盡力寫點(diǎn)不同的東西求豫。
內(nèi)容有不妥的地方塌衰,希望大家指正,希望能一起進(jìn)步蝠嘉,文筆欠佳最疆,見諒。
此處配置的原理蚤告,會在接下來會講到worker和并發(fā)解釋努酸。
轉(zhuǎn)自:https://blog.csdn.net/kuring_k/article/details/51872112