本文主要介紹storm中的基本概念,從基礎(chǔ)上了解strom的體系結(jié)構(gòu),便于后續(xù)編程過程中作為基礎(chǔ)指導(dǎo)。主要的概念包括:
- topology(拓?fù)洌?/li>
- stream(數(shù)據(jù)流)
- spout(水龍頭、數(shù)據(jù)源)
- bolt(螺栓,數(shù)據(jù)篩選處理)
- stream group(數(shù)據(jù)流分組)
- reliability(可靠性)
- task(任務(wù))
- worker(執(zhí)行者)
因?yàn)樯鲜龈拍钪谐丝煽啃詒eliability翻譯起來比較合適验靡,其他幾個(gè)詞實(shí)在找不到合適的對(duì)應(yīng)詞語,就直接使用原詞雏节。
另外一點(diǎn)需要注意的是胜嗓,本文使用的storm-core版本是0.10.0,包路徑為backtype.storm钩乍。因?yàn)榘⒗锇桶烷_源了jstorm辞州,據(jù)說strom2.0之后使用jstorm作為master主干,從github上可以看到包路徑修改為了org.apache.storm件蚕,如果發(fā)現(xiàn)有包路徑錯(cuò)誤的地方孙技,請(qǐng)對(duì)應(yīng)修改产禾。
topology
Storm實(shí)時(shí)運(yùn)行應(yīng)用包邏輯上成為一個(gè)topology,一個(gè)Storm的topology相當(dāng)于MapReduce的job牵啦。關(guān)鍵的不同是MapReduce的job有明確的起始和結(jié)束亚情,而Storm的topology會(huì)一直運(yùn)行下去(除非進(jìn)程被殺死或取消部署)。一個(gè)topology是有多個(gè)spout哈雏、bolt通過數(shù)據(jù)流分組連接起來的圖結(jié)構(gòu)楞件。
本地調(diào)試
本地調(diào)試模擬了集群模式運(yùn)行方式,對(duì)于開發(fā)和調(diào)試topology很有用裳瘪。而且本地模式下運(yùn)行topology與集群模式下類似土浸,只是使用backtype.storm.LocalCluster
來模擬集群狀態(tài)。使用backtype.storm.LocalCluster#submitTopology
方法提交topology彭羹,定義topology唯一名字黄伊、topology的配置(使用的是backtype.storm.Config
對(duì)象)、以及topology對(duì)象(通過backtype.storm.topology.TopologyBuilder#createTopology
方法創(chuàng)建)派殷。通過backtype.storm.LocalCluster#killTopology
殺掉指定topology还最,通過backtype.storm.LocalCluster#shutdown
停止運(yùn)行的本地集群模式。比如:
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(DEFAULT_TOPOLOGY_NAME, config, builder.createTopology());
Utils.sleep(100000);
cluster.killTopology(DEFAULT_TOPOLOGY_NAME);
cluster.shutdown();
本地模式常用的配置如下:
- Config.TOPOLOGY_MAX_TASK_PARALLELISM:這個(gè)配置項(xiàng)主要用來設(shè)置每個(gè)組件線程數(shù)的上限毡惜。在生產(chǎn)環(huán)境中拓轻,每個(gè)topology中有很多并行線程,但是在本地調(diào)試過程中经伙,沒有必要存在這么多并行線程扶叉,可以通過這個(gè)配置來進(jìn)行設(shè)置。
- Config.TOPOLOGY_DEBUG:設(shè)置為true帕膜,Storm將記錄每個(gè)tuple提交后的日志信息枣氧,對(duì)于調(diào)試程序很有用。
集群模式運(yùn)行
集群模式下運(yùn)行topology與本地模式下類似泳叠,具體步驟如下:
- 定義topology(java下使用
backtype.storm.topology.TopologyBuilder#createTopology
創(chuàng)建) - 通過
backtype.storm.StormSubmitter#submitTopology
提交topology到集群作瞄。StormSubmitter需要的參數(shù)與
LocalCluster`的參數(shù)一致:topology名茶宵、topology配置危纫、topology對(duì)象。比如:
Config conf = new Config();
conf.setNumWorkers(20);
conf.setMaxSpoutPending(5000);
StormSubmitter.submitTopology("mytopology", conf, topology);
- 將自己的代碼與依賴的代碼打成jar包(除了storm自己的代碼乌庶,storm自己的代碼已經(jīng)在classpath下了)种蝶。
如果使用的是Mava,可以使用Maven Assembly Plugin打包瞒大,在pom.xml中加入如下代碼:
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.path.to.main.Class</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
- 使用storm客戶端將topology提交到集群螃征,需要指定jar包路徑、類名透敌、以及提交到main方法的參數(shù)列表:
./bin/storm jar path/to/allmycode.jar org.me.MyTopology arg1 arg2 arg3
- 可以使用
storm kill
命令停止一個(gè)topology:
./bin/storm kill topologyName
數(shù)據(jù)流
數(shù)據(jù)流是Storm核心定義的抽象概念盯滚,由無限制的tuple組成的序列踢械,tuple包含一個(gè)或多個(gè)鍵值對(duì)列表,可以包含java自帶的類型或者自定義的可序列化的類型魄藕。
每個(gè)數(shù)據(jù)流可以在定義時(shí)通過backtype.storm.topology.OutputFieldsDeclarer
的declareStream方法指定id内列。默認(rèn)的id是“default”(直接使用declare將使用默認(rèn)id)。
在上面的topology圖中背率,每個(gè)藍(lán)色话瞧、綠色、紅色的條帶是一個(gè)數(shù)據(jù)流寝姿,每個(gè)數(shù)據(jù)流內(nèi)部由tuple組成交排。
spout
spout是topology中數(shù)據(jù)流的數(shù)據(jù)入口,充當(dāng)數(shù)據(jù)采集器功能饵筑,通常spout從外部數(shù)據(jù)源讀取數(shù)據(jù)埃篓,將數(shù)據(jù)轉(zhuǎn)化為tuple,然后將它們發(fā)送到topology中根资。spout可以是可靠的或不可靠的都许。可靠的spout能夠保證在storm處理tuple出現(xiàn)異常情況下嫂冻,能夠重新發(fā)送該tuple胶征,而不可靠的spout不再處理已發(fā)送的tuple。
spout通過backtype.storm.topology.OutputFieldsDeclarer
的declareStream
方法定義數(shù)據(jù)流桨仿,通過backtype.storm.spout.SpoutOutputCollector
的emit
方法發(fā)送tream睛低。
backtype.storm.spout.ISpout#nextTuple
方法是spout的主要方法,可以發(fā)送用于發(fā)送新的tuple服傍,或直接return(不需要發(fā)送新的tuple時(shí)钱雷,可以直接return)。
當(dāng)Storm檢測(cè)到由某一spout發(fā)送的tuple成功處理后吹零,將調(diào)用backtype.storm.spout.ISpout#ack
方法罩抗;當(dāng)調(diào)用失敗,將調(diào)用backtype.storm.spout.ISpout#fail
方法灿椅。具體可以查看后面的可靠性套蒂。
bolt
在topology中所有操作都是在bolt中執(zhí)行的,它可以進(jìn)行過濾茫蛹、計(jì)算操刀、連接、聚合婴洼、數(shù)據(jù)庫讀寫骨坑,以及其他操作〖聿桑可以將一個(gè)或多個(gè)spout作為輸入欢唾,對(duì)數(shù)據(jù)進(jìn)行運(yùn)算后且警,選擇性的輸出一個(gè)或多個(gè)數(shù)據(jù)流。一個(gè)bolt可以做一些簡(jiǎn)單的數(shù)據(jù)變換礁遣,復(fù)雜的數(shù)據(jù)處理需要多個(gè)步驟或多個(gè)bolt振湾。
bolt可以訂閱一個(gè)或多個(gè)spout或bolt的數(shù)據(jù),通過backtype.storm.topology.OutputFieldsDeclarer#declareStream
方法定義輸出的數(shù)據(jù)流亡脸,通過backtype.storm.topology.BasicOutputCollector#emit
方法提交數(shù)據(jù)押搪。
bolt通過backtype.storm.topology.InputDeclarer
類的shuffleGrouping
方法指定需要訂閱的數(shù)據(jù)流,比如:declarer.shuffleGrouping("1", "stream_id")
浅碾,同時(shí)InputDeclarer
也提供了接收所有數(shù)據(jù)流的語法糖大州,比如:declarer.shuffleGrouping("1")
,相當(dāng)于declarer.shuffleGrouping("1", DEFAULT_STREAM_ID)
垂谢。這個(gè)地方有點(diǎn)亂厦画,簡(jiǎn)單的說,bolt B前面有一個(gè)spout A或bolt A滥朱,從A中發(fā)送一個(gè)id為a_id的數(shù)據(jù)流根暑,如果B向只訂閱id為a_id的數(shù)據(jù)流,就使用第一個(gè)方法徙邻,如果可以接收所有id類型的數(shù)據(jù)流排嫌,就用第二個(gè)方法。
該類型中主要執(zhí)行的方法是cn.howardliu.demo.storm.kafka.wordCount.SentenceBolt#execute
缰犁,用來獲取新的tuple淳地,并進(jìn)行處理。同樣使用backtype.storm.topology.BasicOutputCollector#emit
方法發(fā)送新的tuple帅容。bolt可以調(diào)用backtype.storm.task.OutputCollector#ack
方法來通知Storm該tuple已經(jīng)處理完成颇象。
數(shù)據(jù)流分組
定義topology的很重要的一部分就是定義數(shù)據(jù)流數(shù)據(jù)流應(yīng)該發(fā)送到那些bolt中。數(shù)據(jù)流分組就是將數(shù)據(jù)流進(jìn)行分組并徘,按需要進(jìn)入不同的bolt中遣钳。可以使用Storm提供的分組規(guī)則麦乞,也可以實(shí)現(xiàn)backtype.storm.grouping.CustomStreamGrouping
自定義分組規(guī)則蕴茴。Storm定義了8種內(nèi)置的數(shù)據(jù)流分組方法:
- Shuffle grouping(隨機(jī)分組):隨機(jī)分發(fā)tuple給bolt的各個(gè)task,每個(gè)bolt實(shí)例接收到相同數(shù)量的tuple路幸;
- Fields grouping(按字段分組):根據(jù)指定字段的值進(jìn)行分組荐开。比如付翁,一個(gè)數(shù)據(jù)流按照"user-id"分組简肴,所有具有相同"user-id"的tuple將被路由到同一bolt的task中,不同"user-id"可能路由到不同bolt的task中百侧;
- Partial Key grouping(部分key分組):數(shù)據(jù)流根據(jù)field進(jìn)行分組砰识,類似于按字段分組能扒,但是將在兩個(gè)下游bolt之間進(jìn)行均衡負(fù)載,當(dāng)資源發(fā)生傾斜的時(shí)候能夠更有效率的使用資源辫狼。The Power of Both Choices: Practical Load
Balancing for Distributed Stream Processing Engines提供了更加詳細(xì)的說明初斑; - All grouping(全復(fù)制分組):將所有tuple復(fù)制后分發(fā)給所有bolt的task。小心使用膨处。
- Global grouping(全局分組):將所有的tuple路由到唯一一個(gè)task上见秤。Storm按照最小的task ID來選取接收數(shù)據(jù)的task;(注意真椿,當(dāng)時(shí)用全局分組是鹃答,設(shè)置bolt的task并發(fā)是沒有意義的,因?yàn)樗衪uple都轉(zhuǎn)發(fā)到一個(gè)task上突硝。同時(shí)需要注意的是测摔,所有tuple轉(zhuǎn)發(fā)到一個(gè)jvm實(shí)例上,可能會(huì)引起storm集群某個(gè)jvm或服務(wù)器出現(xiàn)性能瓶頸或崩潰)
- None grouping(不分組):這種分組方式指明不需要關(guān)心分組方式解恰。實(shí)際上锋八,不分組功能與隨機(jī)分組相同。預(yù)留功能护盈。
- Direct grouping(指向型分組):數(shù)據(jù)源會(huì)調(diào)用emitDirect來判斷一個(gè)tuple應(yīng)該由哪個(gè)storm組件接收挟纱,只能在聲明了指向型的數(shù)據(jù)流上使用。
- Local or shuffle grouping(本地或隨機(jī)分組):當(dāng)同一個(gè)worker進(jìn)程中有目標(biāo)bolt腐宋,將把數(shù)據(jù)發(fā)送到這些bolt中樊销。否則,功能將與隨機(jī)分組相同脏款。該方法取決與topology的并發(fā)度围苫,本地或隨機(jī)分組可以減少網(wǎng)絡(luò)傳輸,降低IO撤师,提高topology性能剂府。
可靠行
storm可以保證每一個(gè)spout發(fā)出的tuple能夠被完整處理,通過跟蹤tuple樹上的每個(gè)tuple剃盾,檢查是否被成功處理腺占。每個(gè)topology有一個(gè)超時(shí)時(shí)間,如果storm檢查到某個(gè)tuple已經(jīng)超時(shí)痒谴,將重新發(fā)送該tuple衰伯。為了使用這種特性,需要定義tuple的起點(diǎn)积蔚,以及tuple被成功處理意鲸。更多內(nèi)容查看Guaranteeing message processing。
task
task是spout和bolt的實(shí)例,他們的nextTuple()和execute()方法會(huì)被executors線程調(diào)用執(zhí)行怎顾。根據(jù)數(shù)據(jù)流分組來確定如何從某個(gè)task中的tuple發(fā)送到其他的task读慎。
worker
topology運(yùn)行在一個(gè)或多個(gè)worker進(jìn)程上,worker是jvm虛擬機(jī)槐雾,運(yùn)行topology所有task的一部分夭委。比如,topology的并發(fā)是300募强,有50個(gè)worker株灸,那每個(gè)worker就有6個(gè)task。Storm會(huì)平衡所有worker的task數(shù)量擎值。通過Config.TOPOLOGY_WORKERS
來設(shè)置topology的worker數(shù)量蚂且。
個(gè)人主頁: http://www.howardliu.cn
個(gè)人博文: storm筆記:storm基本概念
CSDN主頁: http://blog.csdn.net/liuxinghao
CSDN博文: storm筆記:storm基本概念