storm_初識

<h3>簡介:</h3>
Storm是一個(gè)分布式的,可靠的凹蜈,容錯(cuò)的數(shù)據(jù)流處理系統(tǒng)公般。它會把工作任務(wù)委托給不同類型的組件,每個(gè)組件負(fù)責(zé)處理一項(xiàng)簡單特定的任務(wù)择份。Storm集群的輸入流由一個(gè)被稱作spout的組件管理扣孟,spout把數(shù)據(jù)傳遞給bolt,bolt要么把數(shù)據(jù)保存到某種存儲器荣赶,要么把數(shù)據(jù)傳遞給其它的bolt凤价。你可以想象一下,一個(gè)Storm集群就是在一連串的bolt之間轉(zhuǎn)換spout傳過來的數(shù)據(jù)拔创。
<h4>Storm的特性</h4>
在所有這些設(shè)計(jì)思想與決策中利诺,有一些非常棒的特性成就了獨(dú)一無二的Storm。
<li><b>簡化編程</b></li> 如果你曾試著從零開始實(shí)現(xiàn)實(shí)時(shí)處理剩燥,你應(yīng)該明白這是一件多么痛苦的事情慢逾。使用Storm,復(fù)雜性被大大降低了灭红。使用一門基于JVM的語言開發(fā)會更容易侣滩,但是你可以借助一個(gè)小的中間件,在Storm上使用任何語言開發(fā)变擒。有現(xiàn)成的中間件可供選擇君珠,當(dāng)然也可以自己開發(fā)中間件。
<li><b>容錯(cuò)</b></li> Storm集群會關(guān)注工作節(jié)點(diǎn)狀態(tài)娇斑,如果宕機(jī)了必要的時(shí)候會重新分配任務(wù)葛躏。
可擴(kuò)展 所有你需要為擴(kuò)展集群所做的工作就是增加機(jī)器澈段。Storm會在新機(jī)器就緒時(shí)向它們分配任務(wù)悠菜。
<li><b>可靠</b></li> 所有消息都可保證至少處理一次舰攒。如果出錯(cuò)了,消息可能處理不只一次悔醋,不過你永遠(yuǎn)不會丟失消息摩窃。
<li><b>快速</b></li> 速度是驅(qū)動Storm設(shè)計(jì)的一個(gè)關(guān)鍵因素
<li><b>事務(wù)性</b></li> You can get exactly once messaging semantics for pretty much any computation.你可以為幾乎任何計(jì)算得到恰好一次消息語義。
<h4>storm分布式計(jì)算結(jié)構(gòu)</h4>
Storm 分布式計(jì)算結(jié)構(gòu)稱為topology(拓?fù)?芬骄,由stream(數(shù)據(jù)流)猾愿、spout(數(shù)據(jù)流的生成者)、bolt(運(yùn)算)組成,如下圖账阻。
<pre>


Paste_Image.png

</pre>

<h3>storm本地demo搭建:</h3>
作為一個(gè)程序員來說蒂秘,最大的毛病就是一言不合就上代碼,好吧淘太。我們就枚舉一個(gè)官方用的比較多的例子吧姻僧,spot來噴發(fā)字符串,bolt1來以空格來分隔字符串繼續(xù)向后續(xù)的計(jì)算模塊bolt2分發(fā),bolt2來通過來收集相同字符出現(xiàn)次數(shù)繼續(xù)向計(jì)算模塊bolt3分發(fā)蒲牧,然后bolt3收集blot2的結(jié)果最終打印結(jié)果手動結(jié)束撇贺。如下圖:

Paste_Image.png

新建pom項(xiàng)目,添加pom依賴:
<pre>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.5</version>
</dependency>
</dependencies>
</pre>
SentenceSpout--單詞生成類:
<pre>
public class SentenceSpout extends BaseRichSpout {
//用來發(fā)射數(shù)據(jù)的工具類
private SpoutOutputCollector collector;
private String[] sentences = {"my dog has fleas","i like cold beverages","the dog ate my homework","don't have a cow man","i don't think i like fleas"};
private int index = 0;
//每調(diào)用一次就可以向storm集群中發(fā)射一條數(shù)據(jù)(一個(gè)tuple元組)冰抢,該方法會被不停的調(diào)用
public void nextTuple() {
this.collector.emit(new Values(sentences[index]));
index ++;
if(index >= sentences.length){
index = 0;
}
try {
Thread.sleep(1);
} catch (InterruptedException e) {
}
}
//初始化collector
@SuppressWarnings("rawtypes")
public void open(Map config, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
// 定義字段id松嘶,該id在簡單模式下沒有用處,但在按照字段分組的模式下有很大的用處挎扰。 該declarer變量有很大作用翠订,我們還可以調(diào)用declarer.declareStream();來定義stramId,該id可以用來定義更加復(fù)雜的流拓?fù)浣Y(jié)構(gòu)
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}
}
</pre>
SplitSentenceBolt--單詞分隔類
<pre>
@SuppressWarnings("serial")
public class SplitSentenceBolt extends BaseRichBolt {
//用來定義繼續(xù)向后續(xù)的計(jì)算模塊發(fā)射數(shù)據(jù)的工具類
private OutputCollector collector;
public void execute(Tuple tuple) {
String sentence = tuple.getStringByField("sentence");
String[] words = sentence.split(" ");
for(String word : words){
this.collector.emit(new Values(word));
}
}
//初始化
@SuppressWarnings("rawtypes")
public void prepare(Map config, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
</pre>
WordCountBolt--相同的單詞統(tǒng)計(jì)
<pre>
public class WordCountBolt extends BaseRichBolt {
private OutputCollector collector;
private HashMap<String, Long> counts = null;
public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
Long count = this.counts.get(word);
if (count == null) {
count = 1L;
}
count++;
this.counts.put(word, count);
this.collector.emit(new Values(word, count));
}
@SuppressWarnings("rawtypes")
public void prepare(Map config, TopologyContext context, OutputCollector collector) {
this.collector = collector;
counts = new HashMap<>();
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word","count"));
}
}
</pre>
BaseRichBolt--統(tǒng)計(jì)單詞結(jié)果
<pre>
@SuppressWarnings("serial")
public class ReportBolt extends BaseRichBolt {
private HashMap<String, Long> counts = null;
@Override
public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
Long count = tuple.getLongByField("count");
this.counts.put(word, count);
}
@SuppressWarnings("rawtypes")
@Override
public void prepare(Map config, TopologyContext context, OutputCollector collector) {
counts = new HashMap<>();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer arg0) {
}
public void cleanup(){
System.err.println("---final counts---");
List<String> keys = new ArrayList<>();
keys.addAll(counts.keySet());
Collections.sort(keys);
for(String key : keys){
System.err.println(key + " : " + this.counts.get(key));
}
System.err.println("---end---");
}
}
</pre>
最后的最后我要上main方法啦:
<pre>
public class WordCountTopology {
private static final String SPOUT = "spout";
private static final String SPLIT_BOLT = "splitBolt";
private static final String COUNT_BOLT = "countBolt";
private static final String REPORT_BOLT = "reportBolt";
private static final String TOPOLOGY_NAME = "wordCountTopology";
public static void main(String[] args) throws InterruptedException {
//數(shù)據(jù)發(fā)射器
SentenceSpout spout = new SentenceSpout();
//字符串分隔計(jì)算模塊
SplitSentenceBolt splitBolt = new SplitSentenceBolt();
//字符串統(tǒng)計(jì)模塊
WordCountBolt countBolt = new WordCountBolt();
//最后打印模塊
ReportBolt reportBolt = new ReportBolt();
//創(chuàng)建Topology
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(SPOUT, spout);
builder.setBolt(SPLIT_BOLT, splitBolt).shuffleGrouping(SPOUT);
builder.setBolt(COUNT_BOLT, countBolt).fieldsGrouping(SPLIT_BOLT, new Fields("word"));
builder.setBolt(REPORT_BOLT, reportBolt).globalGrouping(COUNT_BOLT);
Config config = new Config();
//定義本地storm集群遵倦,如果放在linux虛擬機(jī)上跑略有不同
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
Thread.sleep(10000);
//kill Topology,當(dāng)Topology啟動以后會一直執(zhí)行直到kill Topology
cluster.killTopology(TOPOLOGY_NAME);
//關(guān)閉集群尽超,這個(gè)方法跟redis的集群關(guān)閉一樣
cluster.shutdown();
}
}
</pre>
最后統(tǒng)計(jì)結(jié)果:
<pre>
---final counts---
a : 1553
ate : 1554
beverages : 1554
cold : 1554
cow : 1553
dog : 3107
don't : 3105
fleas : 3106
has : 1554
have : 1553
homework : 1554
i : 4658
like : 3106
man : 1553
my : 3107
the : 1554
think : 1553
---end---
</pre>
如果說這么就結(jié)束了,是不是太快了啊骇吭,來分析分析BaseRichSpout橙弱、BaseRichBolt代碼結(jié)構(gòu)吧:
<pre>

Paste_Image.png

</pre>
<p>關(guān)于BaseRichSpout的ack和fail這兩個(gè)方法我不得不講一下,因?yàn)槭强梢杂迷赥uple Stream傳遞后確認(rèn)成功和失敗燥狰,當(dāng)成功以后可以打印成功的log失敗以后可以在fail方法中定義重發(fā).
</p>

<pre>

Paste_Image.png

</pre>
<p>關(guān)于BaseRichBolt的cleanup定義一個(gè)bolt結(jié)束時(shí)被執(zhí)行棘脐,但是不能保證被執(zhí)行。</p>
<p>BaseComponent類的存在也就是為了隔出一層出來實(shí)現(xiàn)getComponentConfiguration避免讓更多的不需要的子類累贅實(shí)現(xiàn)這個(gè)方法龙致。</p>
<p>總的來說storm暴露給我們常用的這幾個(gè)類以及其結(jié)構(gòu)還是很簡單的蛀缝,跟我們自身自己寫代碼的層次差不多,很容易理解目代。</p>
storm成長之路_初識就講到這里啦屈梁,謝謝嗤练!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市在讶,隨后出現(xiàn)的幾起案子煞抬,更是在濱河造成了極大的恐慌,老刑警劉巖构哺,帶你破解...
    沈念sama閱讀 221,820評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件革答,死亡現(xiàn)場離奇詭異,居然都是意外死亡曙强,警方通過查閱死者的電腦和手機(jī)残拐,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,648評論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來碟嘴,“玉大人溪食,你說我怎么就攤上這事∧壬龋” “怎么了错沃?”我有些...
    開封第一講書人閱讀 168,324評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長袱衷。 經(jīng)常有香客問我捎废,道長,這世上最難降的妖魔是什么致燥? 我笑而不...
    開封第一講書人閱讀 59,714評論 1 297
  • 正文 為了忘掉前任登疗,我火速辦了婚禮,結(jié)果婚禮上嫌蚤,老公的妹妹穿的比我還像新娘辐益。我一直安慰自己,他們只是感情好脱吱,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,724評論 6 397
  • 文/花漫 我一把揭開白布智政。 她就那樣靜靜地躺著,像睡著了一般箱蝠。 火紅的嫁衣襯著肌膚如雪续捂。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,328評論 1 310
  • 那天宦搬,我揣著相機(jī)與錄音牙瓢,去河邊找鬼。 笑死间校,一個(gè)胖子當(dāng)著我的面吹牛矾克,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播憔足,決...
    沈念sama閱讀 40,897評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼胁附,長吁一口氣:“原來是場噩夢啊……” “哼酒繁!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起控妻,我...
    開封第一講書人閱讀 39,804評論 0 276
  • 序言:老撾萬榮一對情侶失蹤州袒,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后饼暑,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體稳析,經(jīng)...
    沈念sama閱讀 46,345評論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,431評論 3 340
  • 正文 我和宋清朗相戀三年弓叛,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片诚纸。...
    茶點(diǎn)故事閱讀 40,561評論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡撰筷,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出畦徘,到底是詐尸還是另有隱情毕籽,我是刑警寧澤,帶...
    沈念sama閱讀 36,238評論 5 350
  • 正文 年R本政府宣布井辆,位于F島的核電站关筒,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏杯缺。R本人自食惡果不足惜蒸播,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,928評論 3 334
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望萍肆。 院中可真熱鬧袍榆,春花似錦、人聲如沸塘揣。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,417評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽亲铡。三九已至才写,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間奖蔓,已是汗流浹背赞草。 一陣腳步聲響...
    開封第一講書人閱讀 33,528評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留锭硼,地道東北人房资。 一個(gè)月前我還...
    沈念sama閱讀 48,983評論 3 376
  • 正文 我出身青樓,卻偏偏與公主長得像檀头,于是被迫代替她去往敵國和親轰异。 傳聞我的和親對象是個(gè)殘疾皇子岖沛,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,573評論 2 359

推薦閱讀更多精彩內(nèi)容