Storm(二)官方Tutorial

原文鏈接Storm Tutorial

本人原創(chuàng)翻譯蛆封,轉(zhuǎn)載請注明出處

這個教程內(nèi)容包含如何創(chuàng)建topologies及部署到Storm集群上咐低。Java是主要使用語言号醉,但有的例子使用了Python,主要是為了解釋Storm的多語言能力衔瓮。

前言

本教程使用的例子來自storm-starter project。建議clone該project并照著練習(xí)抖甘。閱讀Setting up a development environmentCreating a new Storm project以使你的計算機具備開始條件热鞍。
這兩篇文章本人已翻譯,請閱Storm(一)打造開發(fā)環(huán)境&創(chuàng)建一個Storm項目

Storm集群的組件

表面上看Storm集群和Hadoop集群有些像衔彻。在Hadoop上運行的是"MapReduce jobs"薇宠,而在Storm上運行的是topologies。"Jobs" 和 "topologies"大不相同艰额,有一個關(guān)鍵不同就是MapReduce job最終會停止澄港,而topology永不停止(除非被用戶kill掉)。

Storm集群有兩類節(jié)點:master節(jié)點和worker節(jié)點柄沮。master節(jié)點上運行著一個守護程序 "Nimbus"(和Hadoop的 "JobTracker"有些像)回梧。Nimbus負(fù)責(zé)在集群中散布code废岂,給各個機器分配任務(wù)以及監(jiān)控失敗的情況。每個worker節(jié)點上也運行著一個守護程序"Supervisor"狱意。Supervisor負(fù)責(zé)接收Nimbus分配的任務(wù)湖苞,按需啟動和停止worker進程。每個worker進程執(zhí)行了一個topology的子集髓涯。一個運行中的topology包含了多個worker袒啼,這些worker分布在多個機器上。

所有Nimbus和Supervisors的協(xié)調(diào)都通過Zookeeper集群進行纬纪。此外蚓再,Nimbus和Supervisors是立即失敗和無狀態(tài)的(fail-fast and stateless)。所有的狀態(tài)都保存在Zookeeper或本地硬盤上包各。這意味著即使通過kill -9 殺死Nimbus和Supervisors摘仅,他們也會自動恢復(fù),這個設(shè)計給了Storm集群難以置信的穩(wěn)定性问畅。

Topologies

要利用Storm來進行實時計算娃属,就要創(chuàng)建Topologies。一個topology是一個計算的圖护姆,topology 中的每個節(jié)點包含處理邏輯矾端,topology 中的每個邊(link)指明了數(shù)據(jù)如何在節(jié)點中傳遞。運行一個topology很簡單卵皂,首先打包code和依賴到j(luò)ar文件秩铆,然后執(zhí)行以下命令:
storm jar all-my-code.jar org.apache.storm.MyTopology arg1 arg2
這樣就啟動了org.apache.storm.MyTopology類,參數(shù)是arg1灯变、arg2殴玛,這個類的主函數(shù)定義了 topology并提交到Nimbus。由于topology是Thrift structs添祸,而且Nimbus是Thrift service滚粟,因此可以用任意的編程語言創(chuàng)建和提交topologies。這里舉的是基于JVM語言的最簡單的例子刃泌,更多信息請閱Running topologies on a production cluster

Streams

Storm 中最核心的抽象概念是"stream"凡壤。stream是元組(tuples)的無限序列,Storm提供了一種分布式耙替、可靠的方式來將一個stream轉(zhuǎn)化成一個新的stream鲤遥。舉個例子,你可以將一個tweets stream轉(zhuǎn)化成一個trending topics stream林艘。

Storm提供了"spouts" 和 "bolts"來完成stream的轉(zhuǎn)化。通過實現(xiàn)Spouts和bolts的接口混坞,你可以運行應(yīng)用相關(guān)的邏輯狐援。
spout是stream的來源钢坦,舉個例子,spout可以從Kestrel隊列中讀取tuples并生成stream啥酱,或者spout也可以通過Twitter API生成一個tweets stream爹凹。

bolt可以消費任意數(shù)量的輸入stream,做一些處理镶殷,很可能拋出新的stream禾酱。類似將tweets stream轉(zhuǎn)化成trending topics stream這樣的復(fù)雜轉(zhuǎn)化,常常需要多個步驟绘趋,對應(yīng)著多個bolt颤陶。Bolts可以做的事情很多,比如run functions, filter tuples, do streaming aggregations, do streaming joins, talk to databases等等陷遮。

spouts和bolts的網(wǎng)絡(luò)被打包成了"topology"滓走,這是你提交到Storm集群執(zhí)行的最高級別的抽象。 topology是一個stream轉(zhuǎn)化的圖帽馋,圖的節(jié)點是spout或bolt搅方,圖的邊指明了哪個bolt在訂閱哪個stream。當(dāng)一個spout 或 bolt發(fā)出一個tuple到一個stream绽族,那么訂閱了這個stream的所有bolt都會收到這個tuple姨涡。


Storm topology中的每個節(jié)點都是并行運行,在你的topology中吧慢,你可以指定各個節(jié)點的并行程度涛漂,Storm會按照你指定的數(shù)目在集群中啟動相應(yīng)數(shù)量的線程。

topology會永久執(zhí)行下去娄蔼,除非你停止它怖喻。Storm會自動重新分配失敗的任務(wù),此外岁诉,Storm保證數(shù)據(jù)不會丟失锚沸,即便機器宕機并且messages are dropped。

Data model

Storm的數(shù)據(jù)模型是tuple涕癣。tuple是一個命名list哗蜈,字段可以是任意類型的對象。Storm支持所有primitive types, strings, and byte arrays作為tuple的字段值坠韩。如果要使用其他類型的對象距潘,只需要實現(xiàn)serializer。
topology的每個節(jié)點都需要定義輸出的tuple字段只搁。如下的例子中音比,bolt定義了兩個帶有 "double"和"triple"字段的tuple。

public class DoubleAndTripleBolt extends BaseRichBolt {
    private OutputCollectorBase _collector;

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) {
        _collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        int val = input.getInteger(0);        
        _collector.emit(input, new Values(val*2, val*3));
        _collector.ack(input);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("double", "triple"));
    }    
}

一個簡單的topology

看一下storm-starter中ExclamationTopology的定義:

TopologyBuilder builder = new TopologyBuilder();        
builder.setSpout("words", new TestWordSpout(), 10);        
builder.setBolt("exclaim1", new ExclamationBolt(), 3)
        .shuffleGrouping("words");
builder.setBolt("exclaim2", new ExclamationBolt(), 2)
        .shuffleGrouping("exclaim1");

這個Topology包含一個spout和兩個bolt氢惋,spout發(fā)出words洞翩,bolt在輸入的string后面加上"!!!"稽犁。節(jié)點被組織成一條直線:spout發(fā)出到第一個bolt,然后到第二個bolt骚亿。如果spout發(fā)出了tuples ["bob"]和["john"]已亥,那么第二個bolt會發(fā)出 ["bob!!!!!!"]和["john!!!!!!"]。

setSpout 和 setBolt方法定義了節(jié)點来屠,第一個參數(shù)是用戶定義的ID虑椎,第二個參數(shù)是處理邏輯對象,第三個參數(shù)是并發(fā)線程數(shù)俱笛。

spout的處理邏輯對象實現(xiàn)了IRichSpout接口捆姜,bolt的處理邏輯對象實現(xiàn)了IRichBolt接口。最后一個參數(shù)是可選的嫂粟,如果不指定娇未,Storm只分配一個線程。

setBolt返回一個InputDeclarer對象星虹,這個對象定義了bolt的輸入零抬。這里組件exclaim1聲明了它要讀取所有組件words發(fā)出的tuples。組件exclaim2聲明了它要讀取所有組件exclaim1發(fā)出的tuples宽涌。"shuffle grouping"會被隨機的分配到bolt平夜。組件之間有多種分組數(shù)據(jù)的方式,如果想要組件exclaim2同時讀取組件words和exclaim1的tuples卸亮,可以像這樣定義:

builder.setBolt("exclaim2", new ExclamationBolt(), 5)
            .shuffleGrouping("words")
            .shuffleGrouping("exclaim1");

接下來看看這個topology的spout和bolt的實現(xiàn)忽妒。spout負(fù)責(zé)發(fā)出數(shù)據(jù)到topology。TestWordSpout每隔100ms從["nathan", "mike", "jackson", "golda", "bertels"]中發(fā)出隨機word作為一個tuple兼贸,TestWordSpout的nextTuple()實現(xiàn)如下:

public void nextTuple() {
    Utils.sleep(100);
    final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
    final Random rand = new Random();
    final String word = words[rand.nextInt(words.length)];
    _collector.emit(new Values(word));
}

ExclamationBolt的實現(xiàn)如下:

public static class ExclamationBolt implements IRichBolt {
    OutputCollector _collector;

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        _collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
        _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
        _collector.ack(tuple);
    }

    @Override
    public void cleanup() {
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

prepare方法給bolt提供了一個OutputCollector段直,用來從這個bolt發(fā)出tuple。Tuples可以在任何時候發(fā)出——包括prepare, execute, 或cleanup方法溶诞,甚至在另一個線程中異步發(fā)出鸯檬。這里prepare方法只是保存OutputCollector為實例變量,后面execute方法會用到螺垢。

execute方法接收一個tuple喧务,ExclamationBolt抓取tuple的第一個字段并在后面加上"!!!"。如果bolt訂閱了多個輸入源枉圃,可以通過Tuple#getSourceComponent方法查詢tuple來源功茴。
輸入tuple被作為了emit的第一個參數(shù),最后一行ack了輸入tuple孽亲。這些是Storm可靠性API的一部分坎穿,用以保證沒有數(shù)據(jù)丟失,本教程后續(xù)會進一步介紹返劲。

cleanup方法在bolt停止的時候調(diào)用玲昧,在這里應(yīng)該關(guān)閉所有打開的資源犯祠。Storm集群不保證一定會調(diào)用這個方法:例如,if the machine the task is running on blows up, there's no way to invoke the method. cleanup方法適用于local模式酌呆,你可以運行和停止許多topologies,不必?fù)?dān)心資源泄露搔耕。

declareOutputFields方法聲明了ExclamationBolt發(fā)出帶一個名為"word"字段的1-tuples隙袁。

getComponentConfiguration方法允許你配置組件運行的參數(shù)。這是一個更高級的主題Configuration

在一個bolt的實現(xiàn)中弃榨,像cleanup 和 getComponentConfiguration這樣的方法常常不需要菩收。你可以通過繼承提供了默認(rèn)實現(xiàn)的基類來更簡潔的實現(xiàn)bolt。例如:

public static class ExclamationBolt extends BaseRichBolt {
    OutputCollector _collector;

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        _collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
        _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
        _collector.ack(tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }    
}

在local模式下運行ExclamationTopology

在local模式中鲸睛,Storm完全在一個進程內(nèi)運行娜饵,worker通過線程模擬,主要用于測試和開發(fā)場景官辈。當(dāng)你運行storm-starte中的topologies箱舞,他們將在local模式下運行,你能夠看到每個組件正在發(fā)出的消息拳亿。
更多l(xiāng)ocal模式信息請閱Local mode
更多分布式模式信息請閱running topologies in local mode on Local mode

如下是local模式下運行ExclamationTopology的代碼:

Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(2);

LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();

首先晴股,通過創(chuàng)建LocalCluster對象定義了一個進程內(nèi)的集群。提交topologies到這個虛擬集群和提交到分布式集群是完全一樣的操作肺魁。submitTopology方法的第一個參數(shù)是topology的名字电湘,第二個參數(shù)是topology的配置,第三個參數(shù)是topology對象鹅经。

這里topology的配置很常見:
TOPOLOGY_WORKERS (由setNumWorkers設(shè)置) 指定了你想分配多少進程來執(zhí)行這個topology寂呛。 topology中的每個組件將以線程的形式執(zhí)行,線程的數(shù)量由setBolt 和 setSpout配置瘾晃。
TOPOLOGY_DEBUG (由setDebug設(shè)置) 當(dāng)設(shè)置為true時, Storm記錄組件發(fā)出的每個消息贷痪。

更多配置信息請閱the Javadoc for Config.

Stream groupings

stream grouping用于描述組件之間如何發(fā)送tuple。在集群中酗捌,spouts 和 bolts總是并行執(zhí)行任務(wù)呢诬,在執(zhí)行任務(wù)層面上,一個topology看起來如下圖所示:


當(dāng)Bolt A的一個任務(wù)發(fā)出tuple到Bolt B時胖缤,應(yīng)該發(fā)到Bolt B的哪個任務(wù)呢尚镰?
stream groupings就是用來解決這個問題。在深入了解不同種類的stream groupings之前哪廓,我們先看看storm-starter中的另一個topology狗唉。WordCountTopology從spout中讀取句子,WordCountBolt統(tǒng)計單詞出現(xiàn)的次數(shù)涡真。

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("sentences", new RandomSentenceSpout(), 5);        
builder.setBolt("split", new SplitSentence(), 8)
        .shuffleGrouping("sentences");
builder.setBolt("count", new WordCount(), 12)
        .fieldsGrouping("split", new Fields("word"));

SplitSentence發(fā)出句子中的單詞作為tuple分俯,WordCount以map的形式存儲單詞出現(xiàn)的次數(shù)肾筐,每次WordCount收到單詞,就更新map并發(fā)出新的單詞數(shù)目缸剪。

最簡單的grouping方式是"shuffle grouping"吗铐,也就是隨機發(fā)送tuple給任務(wù)。"fields grouping"是一個更有趣的grouping方式杏节,這里用在了SplitSentence bolt 和 WordCount bolt之間唬渗。對WordCount bolt來說,相同的單詞應(yīng)該發(fā)送到相同的任務(wù)奋渔,否則多個任務(wù)都會收到同樣的單詞镊逝,由于每個任務(wù)的信息都不完全,他們可能會發(fā)出錯誤的單詞數(shù)目嫉鲸。fields grouping用字段的子集來分組撑蒜,字段值相同的tuple被發(fā)送到相同的任務(wù)。

Fields groupings是實現(xiàn)streaming joins 和 streaming aggregations玄渗,它的底層實現(xiàn)利用了mod hashing座菠。還有一些其他的分組方式,請閱Concepts捻爷。

使用其他編程語言定義Bolts

Bolts可以使用任意語言定義辈灼,用JVM-based語言以外定義的Bolts以子進程的方式運行,Storm以stdin/stdout之上的JSON格式消息與子進程通信也榄。通信協(xié)議用到了一個100行左右的適配器庫巡莹,支持Ruby, Python, Fancy。

WordCountTopology的SplitSentence定義如下:

public static class SplitSentence extends ShellBolt implements IRichBolt {
    public SplitSentence() {
        super("python", "splitsentence.py");
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

SplitSentence重寫了ShellBolt甜紫,聲明它使用python降宅,參數(shù)是splitsentence.py。splitsentence.py實現(xiàn)如下:

import storm

class SplitSentenceBolt(storm.BasicBolt):
    def process(self, tup):
        words = tup.values[0].split(" ")
        for word in words:
          storm.emit([word])

SplitSentenceBolt().run()

更多多語言信息囚霸,請閱Using non-JVM languages with Storm

確保消息被處理

這部分屬于Storm's reliability API的內(nèi)容:Storm如何保證spout發(fā)出的消息都能被處理腰根,請閱Guaranteeing Message Processing

Transactional topologies

Storm保證每個消息都至少被處理一次恰起。一個常見的問題是:Storm會不會overcount署鸡?Storm提供了一種機制,確保消息只被傳遞一次梧躺。transactional topologies劣挫。

Distributed RPC

除了本教程已經(jīng)展示的功能册养,Storm還可以做許多事。其中最有趣的應(yīng)用之一是Distributed RPC压固。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末球拦,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌坎炼,老刑警劉巖愧膀,帶你破解...
    沈念sama閱讀 206,602評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異谣光,居然都是意外死亡檩淋,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,442評論 2 382
  • 文/潘曉璐 我一進店門萄金,熙熙樓的掌柜王于貴愁眉苦臉地迎上來狼钮,“玉大人,你說我怎么就攤上這事捡絮。” “怎么了莲镣?”我有些...
    開封第一講書人閱讀 152,878評論 0 344
  • 文/不壞的土叔 我叫張陵福稳,是天一觀的道長。 經(jīng)常有香客問我瑞侮,道長的圆,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,306評論 1 279
  • 正文 為了忘掉前任半火,我火速辦了婚禮越妈,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘钮糖。我一直安慰自己梅掠,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 64,330評論 5 373
  • 文/花漫 我一把揭開白布店归。 她就那樣靜靜地躺著阎抒,像睡著了一般。 火紅的嫁衣襯著肌膚如雪消痛。 梳的紋絲不亂的頭發(fā)上且叁,一...
    開封第一講書人閱讀 49,071評論 1 285
  • 那天,我揣著相機與錄音秩伞,去河邊找鬼逞带。 笑死,一個胖子當(dāng)著我的面吹牛纱新,可吹牛的內(nèi)容都是我干的展氓。 我是一名探鬼主播,決...
    沈念sama閱讀 38,382評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼怒炸,長吁一口氣:“原來是場噩夢啊……” “哼带饱!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,006評論 0 259
  • 序言:老撾萬榮一對情侶失蹤勺疼,失蹤者是張志新(化名)和其女友劉穎教寂,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體执庐,經(jīng)...
    沈念sama閱讀 43,512評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡酪耕,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,965評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了轨淌。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片迂烁。...
    茶點故事閱讀 38,094評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖递鹉,靈堂內(nèi)的尸體忽然破棺而出盟步,到底是詐尸還是另有隱情,我是刑警寧澤躏结,帶...
    沈念sama閱讀 33,732評論 4 323
  • 正文 年R本政府宣布却盘,位于F島的核電站,受9級特大地震影響媳拴,放射性物質(zhì)發(fā)生泄漏黄橘。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,283評論 3 307
  • 文/蒙蒙 一屈溉、第九天 我趴在偏房一處隱蔽的房頂上張望塞关。 院中可真熱鬧,春花似錦子巾、人聲如沸帆赢。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,286評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽匿醒。三九已至,卻和暖如春缠导,著一層夾襖步出監(jiān)牢的瞬間廉羔,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,512評論 1 262
  • 我被黑心中介騙來泰國打工僻造, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留憋他,地道東北人。 一個月前我還...
    沈念sama閱讀 45,536評論 2 354
  • 正文 我出身青樓髓削,卻偏偏與公主長得像竹挡,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子立膛,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,828評論 2 345

推薦閱讀更多精彩內(nèi)容

  • Date: Nov 17-24, 2017 1. 目的 積累Storm為主的流式大數(shù)據(jù)處理平臺對實時數(shù)據(jù)處理的相關(guān)...
    一只很努力爬樹的貓閱讀 2,158評論 0 4
  • 參考文章: Apache Storm 官方文檔中文版 storm Tutorial 的解讀 + 個人理解 官方文檔...
    louisliaoxh閱讀 1,140評論 0 1
  • Storm入門系列之一:storm核心概念及特性 本文的將介紹一些 storm 入門的基礎(chǔ)知識揪罕,包括 storm ...
    zhaif閱讀 3,077評論 0 17
  • 什么是實時流計算梯码? 主要的處理模式可以分為:流處理,批處理 流處理是直接處理好啰,有時也分為在線,離線,近線(st...
    Bloo_m閱讀 5,050評論 1 1
  • 是什么時候開始框往,我喜歡上了寫點文字鳄抒,表達自己的心情。這個日子大概要追述到青春期的時候椰弊,那時候的我們正走...
    周周writing閱讀 515評論 1 1