原文鏈接Storm Tutorial
本人原創(chuàng)翻譯蛆封,轉(zhuǎn)載請注明出處
這個教程內(nèi)容包含如何創(chuàng)建topologies及部署到Storm集群上咐低。Java是主要使用語言号醉,但有的例子使用了Python,主要是為了解釋Storm的多語言能力衔瓮。
前言
本教程使用的例子來自storm-starter project。建議clone該project并照著練習(xí)抖甘。閱讀Setting up a development environment和 Creating a new Storm project以使你的計算機具備開始條件热鞍。
這兩篇文章本人已翻譯,請閱Storm(一)打造開發(fā)環(huán)境&創(chuàng)建一個Storm項目
Storm集群的組件
表面上看Storm集群和Hadoop集群有些像衔彻。在Hadoop上運行的是"MapReduce jobs"薇宠,而在Storm上運行的是topologies。"Jobs" 和 "topologies"大不相同艰额,有一個關(guān)鍵不同就是MapReduce job最終會停止澄港,而topology永不停止(除非被用戶kill掉)。
Storm集群有兩類節(jié)點:master節(jié)點和worker節(jié)點柄沮。master節(jié)點上運行著一個守護程序 "Nimbus"(和Hadoop的 "JobTracker"有些像)回梧。Nimbus負(fù)責(zé)在集群中散布code废岂,給各個機器分配任務(wù)以及監(jiān)控失敗的情況。每個worker節(jié)點上也運行著一個守護程序"Supervisor"狱意。Supervisor負(fù)責(zé)接收Nimbus分配的任務(wù)湖苞,按需啟動和停止worker進程。每個worker進程執(zhí)行了一個topology的子集髓涯。一個運行中的topology包含了多個worker袒啼,這些worker分布在多個機器上。
所有Nimbus和Supervisors的協(xié)調(diào)都通過Zookeeper集群進行纬纪。此外蚓再,Nimbus和Supervisors是立即失敗和無狀態(tài)的(fail-fast and stateless)。所有的狀態(tài)都保存在Zookeeper或本地硬盤上包各。這意味著即使通過kill -9 殺死Nimbus和Supervisors摘仅,他們也會自動恢復(fù),這個設(shè)計給了Storm集群難以置信的穩(wěn)定性问畅。
Topologies
要利用Storm來進行實時計算娃属,就要創(chuàng)建Topologies。一個topology是一個計算的圖护姆,topology 中的每個節(jié)點包含處理邏輯矾端,topology 中的每個邊(link)指明了數(shù)據(jù)如何在節(jié)點中傳遞。運行一個topology很簡單卵皂,首先打包code和依賴到j(luò)ar文件秩铆,然后執(zhí)行以下命令:
storm jar all-my-code.jar org.apache.storm.MyTopology arg1 arg2
這樣就啟動了org.apache.storm.MyTopology類,參數(shù)是arg1灯变、arg2殴玛,這個類的主函數(shù)定義了 topology并提交到Nimbus。由于topology是Thrift structs添祸,而且Nimbus是Thrift service滚粟,因此可以用任意的編程語言創(chuàng)建和提交topologies。這里舉的是基于JVM語言的最簡單的例子刃泌,更多信息請閱Running topologies on a production cluster
Streams
Storm 中最核心的抽象概念是"stream"凡壤。stream是元組(tuples)的無限序列,Storm提供了一種分布式耙替、可靠的方式來將一個stream轉(zhuǎn)化成一個新的stream鲤遥。舉個例子,你可以將一個tweets stream轉(zhuǎn)化成一個trending topics stream林艘。
Storm提供了"spouts" 和 "bolts"來完成stream的轉(zhuǎn)化。通過實現(xiàn)Spouts和bolts的接口混坞,你可以運行應(yīng)用相關(guān)的邏輯狐援。
spout是stream的來源钢坦,舉個例子,spout可以從Kestrel隊列中讀取tuples并生成stream啥酱,或者spout也可以通過Twitter API生成一個tweets stream爹凹。
bolt可以消費任意數(shù)量的輸入stream,做一些處理镶殷,很可能拋出新的stream禾酱。類似將tweets stream轉(zhuǎn)化成trending topics stream這樣的復(fù)雜轉(zhuǎn)化,常常需要多個步驟绘趋,對應(yīng)著多個bolt颤陶。Bolts可以做的事情很多,比如run functions, filter tuples, do streaming aggregations, do streaming joins, talk to databases等等陷遮。
spouts和bolts的網(wǎng)絡(luò)被打包成了"topology"滓走,這是你提交到Storm集群執(zhí)行的最高級別的抽象。 topology是一個stream轉(zhuǎn)化的圖帽馋,圖的節(jié)點是spout或bolt搅方,圖的邊指明了哪個bolt在訂閱哪個stream。當(dāng)一個spout 或 bolt發(fā)出一個tuple到一個stream绽族,那么訂閱了這個stream的所有bolt都會收到這個tuple姨涡。
Storm topology中的每個節(jié)點都是并行運行,在你的topology中吧慢,你可以指定各個節(jié)點的并行程度涛漂,Storm會按照你指定的數(shù)目在集群中啟動相應(yīng)數(shù)量的線程。
topology會永久執(zhí)行下去娄蔼,除非你停止它怖喻。Storm會自動重新分配失敗的任務(wù),此外岁诉,Storm保證數(shù)據(jù)不會丟失锚沸,即便機器宕機并且messages are dropped。
Data model
Storm的數(shù)據(jù)模型是tuple涕癣。tuple是一個命名list哗蜈,字段可以是任意類型的對象。Storm支持所有primitive types, strings, and byte arrays作為tuple的字段值坠韩。如果要使用其他類型的對象距潘,只需要實現(xiàn)serializer。
topology的每個節(jié)點都需要定義輸出的tuple字段只搁。如下的例子中音比,bolt定義了兩個帶有 "double"和"triple"字段的tuple。
public class DoubleAndTripleBolt extends BaseRichBolt {
private OutputCollectorBase _collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) {
_collector = collector;
}
@Override
public void execute(Tuple input) {
int val = input.getInteger(0);
_collector.emit(input, new Values(val*2, val*3));
_collector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("double", "triple"));
}
}
一個簡單的topology
看一下storm-starter中ExclamationTopology的定義:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("words", new TestWordSpout(), 10);
builder.setBolt("exclaim1", new ExclamationBolt(), 3)
.shuffleGrouping("words");
builder.setBolt("exclaim2", new ExclamationBolt(), 2)
.shuffleGrouping("exclaim1");
這個Topology包含一個spout和兩個bolt氢惋,spout發(fā)出words洞翩,bolt在輸入的string后面加上"!!!"稽犁。節(jié)點被組織成一條直線:spout發(fā)出到第一個bolt,然后到第二個bolt骚亿。如果spout發(fā)出了tuples ["bob"]和["john"]已亥,那么第二個bolt會發(fā)出 ["bob!!!!!!"]和["john!!!!!!"]。
setSpout 和 setBolt方法定義了節(jié)點来屠,第一個參數(shù)是用戶定義的ID虑椎,第二個參數(shù)是處理邏輯對象,第三個參數(shù)是并發(fā)線程數(shù)俱笛。
spout的處理邏輯對象實現(xiàn)了IRichSpout接口捆姜,bolt的處理邏輯對象實現(xiàn)了IRichBolt接口。最后一個參數(shù)是可選的嫂粟,如果不指定娇未,Storm只分配一個線程。
setBolt返回一個InputDeclarer對象星虹,這個對象定義了bolt的輸入零抬。這里組件exclaim1聲明了它要讀取所有組件words發(fā)出的tuples。組件exclaim2聲明了它要讀取所有組件exclaim1發(fā)出的tuples宽涌。"shuffle grouping"會被隨機的分配到bolt平夜。組件之間有多種分組數(shù)據(jù)的方式,如果想要組件exclaim2同時讀取組件words和exclaim1的tuples卸亮,可以像這樣定義:
builder.setBolt("exclaim2", new ExclamationBolt(), 5)
.shuffleGrouping("words")
.shuffleGrouping("exclaim1");
接下來看看這個topology的spout和bolt的實現(xiàn)忽妒。spout負(fù)責(zé)發(fā)出數(shù)據(jù)到topology。TestWordSpout每隔100ms從["nathan", "mike", "jackson", "golda", "bertels"]中發(fā)出隨機word作為一個tuple兼贸,TestWordSpout的nextTuple()實現(xiàn)如下:
public void nextTuple() {
Utils.sleep(100);
final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
final Random rand = new Random();
final String word = words[rand.nextInt(words.length)];
_collector.emit(new Values(word));
}
ExclamationBolt的實現(xiàn)如下:
public static class ExclamationBolt implements IRichBolt {
OutputCollector _collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
@Override
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
prepare方法給bolt提供了一個OutputCollector段直,用來從這個bolt發(fā)出tuple。Tuples可以在任何時候發(fā)出——包括prepare, execute, 或cleanup方法溶诞,甚至在另一個線程中異步發(fā)出鸯檬。這里prepare方法只是保存OutputCollector為實例變量,后面execute方法會用到螺垢。
execute方法接收一個tuple喧务,ExclamationBolt抓取tuple的第一個字段并在后面加上"!!!"。如果bolt訂閱了多個輸入源枉圃,可以通過Tuple#getSourceComponent方法查詢tuple來源功茴。
輸入tuple被作為了emit的第一個參數(shù),最后一行ack了輸入tuple孽亲。這些是Storm可靠性API的一部分坎穿,用以保證沒有數(shù)據(jù)丟失,本教程后續(xù)會進一步介紹返劲。
cleanup方法在bolt停止的時候調(diào)用玲昧,在這里應(yīng)該關(guān)閉所有打開的資源犯祠。Storm集群不保證一定會調(diào)用這個方法:例如,if the machine the task is running on blows up, there's no way to invoke the method. cleanup方法適用于local模式酌呆,你可以運行和停止許多topologies,不必?fù)?dān)心資源泄露搔耕。
declareOutputFields方法聲明了ExclamationBolt發(fā)出帶一個名為"word"字段的1-tuples隙袁。
getComponentConfiguration方法允許你配置組件運行的參數(shù)。這是一個更高級的主題Configuration
在一個bolt的實現(xiàn)中弃榨,像cleanup 和 getComponentConfiguration這樣的方法常常不需要菩收。你可以通過繼承提供了默認(rèn)實現(xiàn)的基類來更簡潔的實現(xiàn)bolt。例如:
public static class ExclamationBolt extends BaseRichBolt {
OutputCollector _collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
@Override
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
在local模式下運行ExclamationTopology
在local模式中鲸睛,Storm完全在一個進程內(nèi)運行娜饵,worker通過線程模擬,主要用于測試和開發(fā)場景官辈。當(dāng)你運行storm-starte中的topologies箱舞,他們將在local模式下運行,你能夠看到每個組件正在發(fā)出的消息拳亿。
更多l(xiāng)ocal模式信息請閱Local mode
更多分布式模式信息請閱running topologies in local mode on Local mode
如下是local模式下運行ExclamationTopology的代碼:
Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(2);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();
首先晴股,通過創(chuàng)建LocalCluster對象定義了一個進程內(nèi)的集群。提交topologies到這個虛擬集群和提交到分布式集群是完全一樣的操作肺魁。submitTopology方法的第一個參數(shù)是topology的名字电湘,第二個參數(shù)是topology的配置,第三個參數(shù)是topology對象鹅经。
這里topology的配置很常見:
TOPOLOGY_WORKERS (由setNumWorkers設(shè)置) 指定了你想分配多少進程來執(zhí)行這個topology寂呛。 topology中的每個組件將以線程的形式執(zhí)行,線程的數(shù)量由setBolt 和 setSpout配置瘾晃。
TOPOLOGY_DEBUG (由setDebug設(shè)置) 當(dāng)設(shè)置為true時, Storm記錄組件發(fā)出的每個消息贷痪。
更多配置信息請閱the Javadoc for Config.
Stream groupings
stream grouping用于描述組件之間如何發(fā)送tuple。在集群中酗捌,spouts 和 bolts總是并行執(zhí)行任務(wù)呢诬,在執(zhí)行任務(wù)層面上,一個topology看起來如下圖所示:
當(dāng)Bolt A的一個任務(wù)發(fā)出tuple到Bolt B時胖缤,應(yīng)該發(fā)到Bolt B的哪個任務(wù)呢尚镰?
stream groupings就是用來解決這個問題。在深入了解不同種類的stream groupings之前哪廓,我們先看看storm-starter中的另一個topology狗唉。WordCountTopology從spout中讀取句子,WordCountBolt統(tǒng)計單詞出現(xiàn)的次數(shù)涡真。
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("sentences", new RandomSentenceSpout(), 5);
builder.setBolt("split", new SplitSentence(), 8)
.shuffleGrouping("sentences");
builder.setBolt("count", new WordCount(), 12)
.fieldsGrouping("split", new Fields("word"));
SplitSentence發(fā)出句子中的單詞作為tuple分俯,WordCount以map的形式存儲單詞出現(xiàn)的次數(shù)肾筐,每次WordCount收到單詞,就更新map并發(fā)出新的單詞數(shù)目缸剪。
最簡單的grouping方式是"shuffle grouping"吗铐,也就是隨機發(fā)送tuple給任務(wù)。"fields grouping"是一個更有趣的grouping方式杏节,這里用在了SplitSentence bolt 和 WordCount bolt之間唬渗。對WordCount bolt來說,相同的單詞應(yīng)該發(fā)送到相同的任務(wù)奋渔,否則多個任務(wù)都會收到同樣的單詞镊逝,由于每個任務(wù)的信息都不完全,他們可能會發(fā)出錯誤的單詞數(shù)目嫉鲸。fields grouping用字段的子集來分組撑蒜,字段值相同的tuple被發(fā)送到相同的任務(wù)。
Fields groupings是實現(xiàn)streaming joins 和 streaming aggregations玄渗,它的底層實現(xiàn)利用了mod hashing座菠。還有一些其他的分組方式,請閱Concepts捻爷。
使用其他編程語言定義Bolts
Bolts可以使用任意語言定義辈灼,用JVM-based語言以外定義的Bolts以子進程的方式運行,Storm以stdin/stdout之上的JSON格式消息與子進程通信也榄。通信協(xié)議用到了一個100行左右的適配器庫巡莹,支持Ruby, Python, Fancy。
WordCountTopology的SplitSentence定義如下:
public static class SplitSentence extends ShellBolt implements IRichBolt {
public SplitSentence() {
super("python", "splitsentence.py");
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
SplitSentence重寫了ShellBolt甜紫,聲明它使用python降宅,參數(shù)是splitsentence.py。splitsentence.py實現(xiàn)如下:
import storm
class SplitSentenceBolt(storm.BasicBolt):
def process(self, tup):
words = tup.values[0].split(" ")
for word in words:
storm.emit([word])
SplitSentenceBolt().run()
更多多語言信息囚霸,請閱Using non-JVM languages with Storm
確保消息被處理
這部分屬于Storm's reliability API的內(nèi)容:Storm如何保證spout發(fā)出的消息都能被處理腰根,請閱Guaranteeing Message Processing
Transactional topologies
Storm保證每個消息都至少被處理一次恰起。一個常見的問題是:Storm會不會overcount署鸡?Storm提供了一種機制,確保消息只被傳遞一次梧躺。transactional topologies劣挫。
Distributed RPC
除了本教程已經(jīng)展示的功能册养,Storm還可以做許多事。其中最有趣的應(yīng)用之一是Distributed RPC压固。