最詳細(xì)的Storm入門教程(二)

Storm入門例子詳解-單詞計(jì)數(shù)器

概念

Storm 分布式計(jì)算結(jié)構(gòu)稱為 topology(拓?fù)洌┥髁辏?stream(數(shù)據(jù)流)示血, spout(數(shù)據(jù)流的生成者), bolt(運(yùn)算)組成。

Storm 的核心數(shù)據(jù)結(jié)構(gòu)是 tuple祠墅。 tuple是 包 含 了 一 個(gè) 或 者 多 個(gè) 鍵 值 對(duì) 的 列 表,Stream 是 由 無 限 制 的 tuple 組 成 的 序 列歌径。
spout 代表了一個(gè) Storm topology 的主要數(shù)據(jù)入口毁嗦,充當(dāng)采集器的角色,連接到數(shù)據(jù)源回铛,將數(shù)據(jù)轉(zhuǎn)化為一個(gè)個(gè) tuple狗准,并將 tuple 作為數(shù)據(jù)流進(jìn)行發(fā)射克锣。

bolt 可以理解為計(jì)算程序中的運(yùn)算或者函數(shù),將一個(gè)或者多個(gè)數(shù)據(jù)流作為輸入腔长,對(duì)數(shù)據(jù)實(shí)施運(yùn)算后袭祟,選擇性地輸出一個(gè)或者多個(gè)數(shù)據(jù)流。 bolt 可以訂閱多個(gè)由 spout 或者其他bolt 發(fā)射的數(shù)據(jù)流捞附,這樣就可以建立復(fù)雜的數(shù)據(jù)流轉(zhuǎn)換網(wǎng)絡(luò)巾乳。

Storm Topology

本例子單詞計(jì)數(shù) topology 的數(shù)據(jù)流大概是這樣:

單詞計(jì)數(shù)topology

項(xiàng)目搭建

新建類SentenceSpout.java(數(shù)據(jù)流生成者)


import java.util.Map;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

/**
 * 向后端發(fā)射tuple數(shù)據(jù)流
 * @author soul
 *
 */
public class SentenceSpout extends BaseRichSpout {
    
    //BaseRichSpout是ISpout接口和IComponent接口的簡單實(shí)現(xiàn),接口對(duì)用不到的方法提供了默認(rèn)的實(shí)現(xiàn)
    
    private SpoutOutputCollector collector;
    private String[] sentences = {
            "my name is soul",
            "im a boy",
            "i have a dog",
            "my dog has fleas",
            "my girl friend is beautiful"
    };
    
    private int index=0;
    
    /**
     * open()方法中是ISpout接口中定義鸟召,在Spout組件初始化時(shí)被調(diào)用胆绊。
     * open()接受三個(gè)參數(shù):一個(gè)包含Storm配置的Map,一個(gè)TopologyContext對(duì)象,提供了topology中組件的信息,SpoutOutputCollector對(duì)象提供發(fā)射tuple的方法欧募。
     * 在這個(gè)例子中,我們不需要執(zhí)行初始化,只是簡單的存儲(chǔ)在一個(gè)SpoutOutputCollector實(shí)例變量压状。
     */
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        // TODO Auto-generated method stub
        this.collector = collector;
    }

    /**
     * nextTuple()方法是任何Spout實(shí)現(xiàn)的核心。
     * Storm調(diào)用這個(gè)方法跟继,向輸出的collector發(fā)出tuple何缓。
     * 在這里,我們只是發(fā)出當(dāng)前索引的句子,并增加該索引準(zhǔn)備發(fā)射下一個(gè)句子还栓。
     */
    public void nextTuple() {
        //collector.emit(new Values("hello world this is a test"));
        
        // TODO Auto-generated method stub
        this.collector.emit(new Values(sentences[index]));
        index++;
        if (index>=sentences.length) {
            index=0;
        }
        Utils.sleep(1);
    }

    /**
     * declareOutputFields是在IComponent接口中定義的碌廓,所有Storm的組件(spout和bolt)都必須實(shí)現(xiàn)這個(gè)接口
     * 用于告訴Storm流組件將會(huì)發(fā)出那些數(shù)據(jù)流,每個(gè)流的tuple將包含的字段
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub
        
        declarer.declare(new Fields("sentence"));//告訴組件發(fā)出數(shù)據(jù)流包含sentence字段
        
    }

}

新建類SplitSentenceBolt.java(單詞分割器)

import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

/**
 * 訂閱sentence spout發(fā)射的tuple流剩盒,實(shí)現(xiàn)分割單詞
 * @author soul
 *
 */
public class SplitSentenceBolt extends BaseRichBolt {
    //BaseRichBolt是IComponent和IBolt接口的實(shí)現(xiàn)
    //繼承這個(gè)類谷婆,就不用去實(shí)現(xiàn)本例不關(guān)心的方法
    
    private OutputCollector collector;

    /**
     * prepare()方法類似于ISpout 的open()方法。
     * 這個(gè)方法在blot初始化時(shí)調(diào)用辽聊,可以用來準(zhǔn)備bolt用到的資源,比如數(shù)據(jù)庫連接纪挎。
     * 本例子和SentenceSpout類一樣,SplitSentenceBolt類不需要太多額外的初始化,
     * 所以prepare()方法只保存OutputCollector對(duì)象的引用。
     */
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        // TODO Auto-generated method stub
        this.collector=collector;
        
    }

    /**
     * SplitSentenceBolt核心功能是在類IBolt定義execute()方法跟匆,這個(gè)方法是IBolt接口中定義异袄。
     * 每次Bolt從流接收一個(gè)訂閱的tuple,都會(huì)調(diào)用這個(gè)方法玛臂。
     * 本例中,收到的元組中查找“sentence”的值,
     * 并將該值拆分成單個(gè)的詞,然后按單詞發(fā)出新的tuple烤蜕。
     */
    public void execute(Tuple input) {
        // TODO Auto-generated method stub
        String sentence = input.getStringByField("sentence");
        String[] words = sentence.split(" ");
        for (String word : words) {
            this.collector.emit(new Values(word));//向下一個(gè)bolt發(fā)射數(shù)據(jù)
        }       
    }

    /**
     * plitSentenceBolt類定義一個(gè)元組流,每個(gè)包含一個(gè)字段(“word”)。
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub
        declarer.declare(new Fields("word"));
    }
    
}

新建類WordCountBolt.java(單詞計(jì)數(shù)器)


import java.util.HashMap;
import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

/**
 * 訂閱 split sentence bolt的輸出流迹冤,實(shí)現(xiàn)單詞計(jì)數(shù)讽营,并發(fā)送當(dāng)前計(jì)數(shù)給下一個(gè)bolt
 * @author soul
 *
 */
public class WordCountBolt extends BaseRichBolt {
    private OutputCollector collector;
    //存儲(chǔ)單詞和對(duì)應(yīng)的計(jì)數(shù)
    private HashMap<String, Long> counts = null;//注:不可序列化對(duì)象需在prepare中實(shí)例化
    
    /**
     * 大部分實(shí)例變量通常是在prepare()中進(jìn)行實(shí)例化,這個(gè)設(shè)計(jì)模式是由topology的部署方式?jīng)Q定的
     * 因?yàn)樵诓渴鹜負(fù)鋾r(shí),組件spout和bolt是在網(wǎng)絡(luò)上發(fā)送的序列化的實(shí)例變量泡徙。
     * 如果spout或bolt有任何non-serializable實(shí)例變量在序列化之前被實(shí)例化(例如,在構(gòu)造函數(shù)中創(chuàng)建)
     * 會(huì)拋出NotSerializableException并且拓?fù)鋵o法發(fā)布橱鹏。
     * 本例中因?yàn)镠ashMap 是可序列化的,所以可以安全地在構(gòu)造函數(shù)中實(shí)例化。
     * 但是,通常情況下最好是在構(gòu)造函數(shù)中對(duì)基本數(shù)據(jù)類型和可序列化的對(duì)象進(jìn)行復(fù)制和實(shí)例化
     * 而在prepare()方法中對(duì)不可序列化的對(duì)象進(jìn)行實(shí)例化莉兰。
     */
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        // TODO Auto-generated method stub
        this.collector = collector;
        this.counts = new HashMap<String, Long>();
    }

    /**
     * 在execute()方法中,我們查找的收到的單詞的計(jì)數(shù)(如果不存在挑围,初始化為0)
     * 然后增加計(jì)數(shù)并存儲(chǔ),發(fā)出一個(gè)新的詞和當(dāng)前計(jì)數(shù)組成的二元組。
     * 發(fā)射計(jì)數(shù)作為流允許拓?fù)涞钠渌鸼olt訂閱和執(zhí)行額外的處理糖荒。
     */
    public void execute(Tuple input) {
        // TODO Auto-generated method stub
        
        String word = input.getStringByField("word");
        Long count = this.counts.get(word);
        if (count == null) {
            count = 0L;//如果不存在杉辙,初始化為0
        }
        count++;//增加計(jì)數(shù)
        this.counts.put(word, count);//存儲(chǔ)計(jì)數(shù)
        this.collector.emit(new Values(word,count));
    }

    /**
     * 
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub
        //聲明一個(gè)輸出流,其中tuple包括了單詞和對(duì)應(yīng)的計(jì)數(shù)寂嘉,向后發(fā)射
        //其他bolt可以訂閱這個(gè)數(shù)據(jù)流進(jìn)一步處理
        declarer.declare(new Fields("word","count"));
    }
    
}

新建類ReportBolt.java(報(bào)告生成器)


import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;

/**
 * 生成一份報(bào)告
 * @author soul
 *
 */
public class ReportBolt extends BaseRichBolt {
    
    private HashMap<String, Long> counts = null;//保存單詞和對(duì)應(yīng)的計(jì)數(shù)

    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        // TODO Auto-generated method stub

        this.counts = new HashMap<String, Long>();
    }

    public void execute(Tuple input) {
        // TODO Auto-generated method stub

        String word = input.getStringByField("word");
        Long count = input.getLongByField("count");
        this.counts.put(word, count);
        
        //實(shí)時(shí)輸出
        System.out.println("結(jié)果:"+this.counts);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub
        //這里是末端bolt奏瞬,不需要發(fā)射數(shù)據(jù)流枫绅,這里無需定義
    }
    
    /**
     * cleanup是IBolt接口中定義
     * Storm在終止一個(gè)bolt之前會(huì)調(diào)用這個(gè)方法
     * 本例我們利用cleanup()方法在topology關(guān)閉時(shí)輸出最終的計(jì)數(shù)結(jié)果
     * 通常情況下泉孩,cleanup()方法用來釋放bolt占用的資源,如打開的文件句柄或數(shù)據(jù)庫連接
     * 但是當(dāng)Storm拓?fù)湓谝粋€(gè)集群上運(yùn)行并淋,IBolt.cleanup()方法不能保證執(zhí)行(這里是開發(fā)模式寓搬,生產(chǎn)環(huán)境不要這樣做)。
     */
    public void cleanup(){
        System.out.println("---------- FINAL COUNTS -----------");
        
        ArrayList<String> keys = new ArrayList<String>();
        keys.addAll(this.counts.keySet());
        Collections.sort(keys);
        for(String key : keys){
            System.out.println(key + " : " + this.counts.get(key));
        }
        System.out.println("----------------------------");
    }

}

修改程序主入口App.java


import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.Utils;

/**
 * 實(shí)現(xiàn)單詞計(jì)數(shù)topology
 *
 */
public class App 
{
    private static final String SENTENCE_SPOUT_ID = "sentence-spout";
    private static final String SPLIT_BOLT_ID = "split-bolt";
    private static final String COUNT_BOLT_ID = "count-bolt";
    private static final String REPORT_BOLT_ID = "report-bolt";
    private static final String TOPOLOGY_NAME = "word-count-topology";
    
    public static void main( String[] args ) //throws Exception
    {
        //System.out.println( "Hello World!" );
        //實(shí)例化spout和bolt
        
        SentenceSpout spout = new SentenceSpout();
        SplitSentenceBolt splitBolt = new SplitSentenceBolt();
        WordCountBolt countBolt = new WordCountBolt();
        ReportBolt reportBolt = new ReportBolt();

        TopologyBuilder builder = new TopologyBuilder();//創(chuàng)建了一個(gè)TopologyBuilder實(shí)例
        
        //TopologyBuilder提供流式風(fēng)格的API來定義topology組件之間的數(shù)據(jù)流
        
        //builder.setSpout(SENTENCE_SPOUT_ID, spout);//注冊(cè)一個(gè)sentence spout
        
        //設(shè)置兩個(gè)Executeor(線程)县耽,默認(rèn)一個(gè)
        builder.setSpout(SENTENCE_SPOUT_ID, spout,2);
        
        // SentenceSpout --> SplitSentenceBolt
        
        //注冊(cè)一個(gè)bolt并訂閱sentence發(fā)射出的數(shù)據(jù)流句喷,shuffleGrouping方法告訴Storm要將SentenceSpout發(fā)射的tuple隨機(jī)均勻的分發(fā)給SplitSentenceBolt的實(shí)例
        //builder.setBolt(SPLIT_BOLT_ID, splitBolt).shuffleGrouping(SENTENCE_SPOUT_ID);
        
        //SplitSentenceBolt單詞分割器設(shè)置4個(gè)Task,2個(gè)Executeor(線程)
        builder.setBolt(SPLIT_BOLT_ID, splitBolt,2).setNumTasks(4).shuffleGrouping(SENTENCE_SPOUT_ID);
        
        // SplitSentenceBolt --> WordCountBolt
        
        //fieldsGrouping將含有特定數(shù)據(jù)的tuple路由到特殊的bolt實(shí)例中
        //這里fieldsGrouping()方法保證所有“word”字段相同的tuuple會(huì)被路由到同一個(gè)WordCountBolt實(shí)例中
        //builder.setBolt(COUNT_BOLT_ID, countBolt).fieldsGrouping( SPLIT_BOLT_ID, new Fields("word"));
        
        //WordCountBolt單詞計(jì)數(shù)器設(shè)置4個(gè)Executeor(線程)
        builder.setBolt(COUNT_BOLT_ID, countBolt,4).fieldsGrouping( SPLIT_BOLT_ID, new Fields("word"));
        
        // WordCountBolt --> ReportBolt
        
        //globalGrouping是把WordCountBolt發(fā)射的所有tuple路由到唯一的ReportBolt
        builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(COUNT_BOLT_ID);
        
        
        Config config = new Config();//Config類是一個(gè)HashMap<String,Object>的子類兔毙,用來配置topology運(yùn)行時(shí)的行為
        //設(shè)置worker數(shù)量
        //config.setNumWorkers(2);
        LocalCluster cluster = new LocalCluster();
        
        //本地提交
        cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
        
        Utils.sleep(10000);
        cluster.killTopology(TOPOLOGY_NAME);        
        cluster.shutdown();
        
    }
}

運(yùn)行程序唾琼,可看到單詞計(jì)數(shù)實(shí)時(shí)輸出效果

實(shí)時(shí)輸出

運(yùn)行10秒后生成報(bào)告

單詞計(jì)數(shù)生成報(bào)告
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市澎剥,隨后出現(xiàn)的幾起案子锡溯,更是在濱河造成了極大的恐慌,老刑警劉巖哑姚,帶你破解...
    沈念sama閱讀 218,640評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件祭饭,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡叙量,警方通過查閱死者的電腦和手機(jī)倡蝙,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,254評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來绞佩,“玉大人寺鸥,你說我怎么就攤上這事∑飞剑” “怎么了析既?”我有些...
    開封第一講書人閱讀 165,011評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵,是天一觀的道長谆奥。 經(jīng)常有香客問我眼坏,道長,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,755評(píng)論 1 294
  • 正文 為了忘掉前任宰译,我火速辦了婚禮檐蚜,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘沿侈。我一直安慰自己闯第,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,774評(píng)論 6 392
  • 文/花漫 我一把揭開白布缀拭。 她就那樣靜靜地躺著咳短,像睡著了一般。 火紅的嫁衣襯著肌膚如雪蛛淋。 梳的紋絲不亂的頭發(fā)上咙好,一...
    開封第一講書人閱讀 51,610評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音褐荷,去河邊找鬼勾效。 笑死,一個(gè)胖子當(dāng)著我的面吹牛叛甫,可吹牛的內(nèi)容都是我干的层宫。 我是一名探鬼主播,決...
    沈念sama閱讀 40,352評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼其监,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼萌腿!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起抖苦,我...
    開封第一講書人閱讀 39,257評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤毁菱,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后睛约,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體鼎俘,經(jīng)...
    沈念sama閱讀 45,717評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,894評(píng)論 3 336
  • 正文 我和宋清朗相戀三年辩涝,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了贸伐。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,021評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡怔揩,死狀恐怖捉邢,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情商膊,我是刑警寧澤伏伐,帶...
    沈念sama閱讀 35,735評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站晕拆,受9級(jí)特大地震影響藐翎,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,354評(píng)論 3 330
  • 文/蒙蒙 一吝镣、第九天 我趴在偏房一處隱蔽的房頂上張望堤器。 院中可真熱鬧,春花似錦末贾、人聲如沸闸溃。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,936評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽辉川。三九已至,卻和暖如春拴测,著一層夾襖步出監(jiān)牢的瞬間乓旗,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,054評(píng)論 1 270
  • 我被黑心中介騙來泰國打工昼扛, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留寸齐,地道東北人欲诺。 一個(gè)月前我還...
    沈念sama閱讀 48,224評(píng)論 3 371
  • 正文 我出身青樓抄谐,卻偏偏與公主長得像,于是被迫代替她去往敵國和親扰法。 傳聞我的和親對(duì)象是個(gè)殘疾皇子蛹含,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,974評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容