32 storm 單詞計數(shù)

上一篇 簡單看 storm, 主要簡單講解了storm 的集群架構萍桌、核心概念、并行度凌简、流分組上炎,本篇利用 storm 結合代碼進行單詞計數(shù),講解從代碼層面理解入門storm号醉。

直接開擼代碼

單詞計數(shù)簡單的實現(xiàn)邏輯:

  • 構造一個 Spout反症,為下游 Bolt 作業(yè)提供數(shù)據(jù)源
  • 構造一個 Bolt,處理上游流向數(shù)據(jù)畔派,進行單詞切分
  • 構造一個 Bolt铅碍,處理上游 Bolt ,進行單詞計數(shù)
  • 將 Spout 线椰、Bolt 組裝起來胞谈,構建成一個拓撲(Topology)
  • 將 Topology 提交到 storm 集群,等待結果

創(chuàng)建名為 storm-wordcount 的普通 maven project

  • 在 pom.xml 引入相關類庫依賴
  <dependencies>
     <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
             <version>1.1.0</version>
                 <scope>provided</scope>
        </dependency>
        <dependency>
          <groupId>commons-collections</groupId>
          <artifactId>commons-collections</artifactId>
          <version>3.2.1</version>
        </dependency>
        <dependency>
          <groupId>com.google.guava</groupId>
          <artifactId>guava</artifactId>
          <version>22.0</version>
        </dependency>
  </dependencies>
  • 加入 plugin 憨愉,打包
<build>
        <sourceDirectory>src/main/java</sourceDirectory>
        <testSourceDirectory>test/main/java</testSourceDirectory>
        
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <configuration>
                    <createDependencyReducedPom>true</createDependencyReducedPom>
                    <filters>
                        <filter>
                            <artifact>*:*</artifact>
                            <excludes>
                                <exclude>META-INF/*.SF</exclude>
                                <exclude>META-INF/*.sf</exclude>
                                <exclude>META-INF/*.DSA</exclude>
                                <exclude>META-INF/*.dsa</exclude>
                                <exclude>META-INF/*.RSA</exclude>
                                <exclude>META-INF/*.rsa</exclude>
                                <exclude>META-INF/*.EC</exclude>
                                <exclude>META-INF/*.ec</exclude>
                                <exclude>META-INF/MSFTSIG.SF</exclude>
                                <exclude>META-INF/MSFTSIG.RSA</exclude>
                            </excludes>
                        </filter>
                    </filters>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
    
          <plugin>
            <groupId>org.codehaus.mojo</groupId>
            <artifactId>exec-maven-plugin</artifactId>
            <version>1.2.1</version>
            <executions>
              <execution>
                <goals>
                  <goal>exec</goal>
                </goals>
              </execution>
            </executions>
            <configuration>
              <executable>java</executable>
              <includeProjectDependencies>true</includeProjectDependencies>
              <includePluginDependencies>false</includePluginDependencies>
              <classpathScope>compile</classpathScope>
              <mainClass></mainClass>
            </configuration>
          </plugin>
        </plugins>
    </build>

新建一個 WordCountTopology 類

  • 在 WordCountTopology 中編寫一個 RandomSentenceSpout 靜態(tài)內部類烦绳,繼承實現(xiàn) BaseRichSpout 抽象類
/**
     * 
     * 編寫spout ,繼承一個基類,負責從數(shù)據(jù)源獲取數(shù)據(jù)
     * @author bill
     * @date 2017年9月16日 下午8:21:46
     */
    public static class RandomSentenceSpout extends BaseRichSpout{
        
        private static final long serialVersionUID = 6102239192526611945L;

        private static final Logger LOGGER = LoggerFactory.getLogger(RandomSentenceSpout.class);
        
        private SpoutOutputCollector collector;
        private Random random;

        /**
         * 當一個Task被初始化的時候會調用此open方法,
         * 一般都會在此方法中對發(fā)送Tuple的對象SpoutOutputCollector和配置對象TopologyContext初始化
         */
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.collector = collector;
            this.random = new Random();
        }

        /**
         * 這個spout類配紫,之前說過径密,最終會運行在task中,某個worker進程的某個executor線程內部的某個task中
         * 那個task會負責去不斷的無限循環(huán)調用nextTuple()方法
         * 只要的話呢躺孝,無限循環(huán)調用享扔,可以不斷發(fā)射最新的數(shù)據(jù)出去,形成一個數(shù)據(jù)流
         */
        public void nextTuple() {
            String[] sentences = new String[]{
                     "I used to watch her from my kitchen widow"
                    , "she seemed so small as she muscled her way through the crowd of boys on the playground"
                    , "The school was across the street from our home and I would often watch the kids as they played during recess"
                    , "A sea of children, and yet tome"
                    , "she stood out from them all"};
            String sentence = sentences[random.nextInt(sentences.length)];
            LOGGER.info(" ★★★  發(fā)射 sentence 數(shù)據(jù) > {}", sentence);  
            // 這個values植袍,你可以認為就是構建一個tuple,tuple是最小的數(shù)據(jù)單位惧眠,無限個tuple組成的流就是一個stream,通過 emit 發(fā)送數(shù)據(jù)到下游bolt tuple
            this.collector.emit(new Values(sentence));
        }

        /**
         * 用于聲明當前Spout的Tuple發(fā)送流的域名字。Stream流的定義是通過OutputFieldsDeclare.declareStream方法完成的
         * 通俗點說法:就是這個方法是定義一個你發(fā)射出去的每個tuple中的每個field的名稱是什么于个,作為下游
 bolt 中 execute 接收數(shù)據(jù) key 
         */
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("sentence"));
        }
    }
  • 在 WordCountTopology 中編寫一個 SplitSentenceBolt 靜態(tài)內部類氛魁,繼承實現(xiàn) BaseRichBolt 抽象類,用于處理上游 Spout 發(fā)送過來的數(shù)據(jù),這里做句子單詞切分
/**
     * 
     * 編寫一個bolt秀存,用于切分每個單詞捶码,同時把單詞發(fā)送出去
     * @author bill
     * @date 2017年9月16日 下午8:27:45
     */
    public static class SplitSentenceBolt extends BaseRichBolt{
        
        private static final long serialVersionUID = -4758047349803579486L;
        
        private OutputCollector collector;

        /**
         * 當一個Task被初始化的時候會調用此prepare方法,對于bolt來說,第一個方法或链,就是prepare方法
         * OutputCollector宙项,這個也是Bolt的這個tuple的發(fā)射器,一般都會在此方法中對發(fā)送Tuple的對象OutputCollector初始化
         */
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }
        
        /**
         * 這是Bolt中最關鍵的一個方法,對于Tuple的處理都可以放到此方法中進行株扛。具體的發(fā)送也是通過emit方法來完成的
         * 就是說,每次接收到一條數(shù)據(jù)后汇荐,就會交給這個executor方法來執(zhí)行
         * 切分單詞
         */
        public void execute(Tuple input) {
            // 接收上游數(shù)據(jù)
            String sentence = input.getStringByField("sentence");
            String[] words = sentence.split(" ");
            for(String word : words){
                //發(fā)射數(shù)據(jù)
                this.collector.emit(new Values(word));
            }
        }

        /**
         * 用于聲明當前bolt的Tuple發(fā)送流的域名字洞就。Stream流的定義是通過OutputFieldsDeclare.declareStream方法完成的
         * 通俗點說法:就是這個方法是定義一個你發(fā)射出去的每個tuple中的每個field的名稱是什么,作為下游 bolt 中 execute 接收數(shù)據(jù) key 
         * 定義發(fā)射出去的tuple掀淘,每個field的名稱
         */
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }   
    }
  • 在 WordCountTopology 中編寫一個 WordCountBolt 靜態(tài)內部類旬蟋,繼承實現(xiàn) BaseRichBolt 抽象類,用于處理上游 Bolt 發(fā)送過來的數(shù)據(jù)革娄,這里做單詞計數(shù)
/**
     * 
     * 單詞次數(shù)統(tǒng)計bolt
     * @author bill
     * @date 2017年9月16日 下午8:35:00
     */
    public static class WordCountBolt extends BaseRichBolt{
        
        private static final Logger LOGGER = LoggerFactory.getLogger(WordCountBolt.class);

        private static final long serialVersionUID = -7114915627898482737L;
        
        private OutputCollector collector;
        
        Map<String,Long> countMap = Maps.newConcurrentMap();
        
        /**
         * 當一個Task被初始化的時候會調用此prepare方法,對于bolt來說倾贰,第一個方法,就是prepare方法
         * OutputCollector拦惋,這個也是Bolt的這個tuple的發(fā)射器,一般都會在此方法中對發(fā)送Tuple的對象OutputCollector初始化
         */
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }

        /**
         * 這是Bolt中最關鍵的一個方法匆浙,對于Tuple的處理都可以放到此方法中進行。具體的發(fā)送也是通過emit方法來完成的
         * 就是說厕妖,每次接收到一條數(shù)據(jù)后首尼,就會交給這個executor方法來執(zhí)行
         * 統(tǒng)計單詞
         */
        public void execute(Tuple input) {
            // 接收上游數(shù)據(jù)
            String word = input.getStringByField("word");
            Long count = countMap.get(word);
            if(null == count){
                count = 0L;
            }
            count ++;
            countMap.put(word, count);
            LOGGER.info(" ★★★  單詞計數(shù)[{}] 出現(xiàn)的次數(shù):{}", word, count); 
            //發(fā)射數(shù)據(jù)
            this.collector.emit(new Values(word,count));
        }
        
        /**
         * 用于聲明當前bolt的Tuple發(fā)送流的域名字。Stream流的定義是通過OutputFieldsDeclare.declareStream方法完成的
         * 通俗點說法:就是這個方法是定義一個你發(fā)射出去的每個tuple中的每個field的名稱是什么言秸,作為下游 bolt 中 execute 接收數(shù)據(jù) key 
         * 定義發(fā)射出去的tuple软能,每個field的名稱
         */
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word","count"));
        }
    }
  • 構造拓撲(Topology),提交 storm 集群執(zhí)行(**注: storm 提供了本地模擬集群举畸,可以直接在代碼編輯器中編譯執(zhí)行

在 WordCountTopology 中編寫main 執(zhí)行方法查排,代碼如下:

public static void main(String[] args) {
        //去將spout和bolts組合起來,構建成一個拓撲
        TopologyBuilder builder = new TopologyBuilder();
        
        // 第一個參數(shù)的意思抄沮,就是給這個spout設置一個名字
        // 第二個參數(shù)的意思跋核,就是創(chuàng)建一個spout的對象
        // 第三個參數(shù)的意思,就是設置spout的executor有幾個
        builder.setSpout("RandomSentence", new RandomSentenceSpout(), 2);
        builder.setBolt("SplitSentence", new SplitSentenceBolt(), 5)
        //為bolt 設置 幾個task
        .setNumTasks(10)
        //設置流分組策略
        .shuffleGrouping("RandomSentence");
        
        // fieldsGrouping 這個很重要合是,就是說了罪,相同的單詞,從SplitSentenceSpout發(fā)射出來時聪全,一定會進入到下游的指定的同一個task中
        // 只有這樣子泊藕,才能準確的統(tǒng)計出每個單詞的數(shù)量
        // 比如你有個單詞,hello,下游task1接收到3個hello娃圆,task2接收到2個hello
        // 通過fieldsGrouping 可以將 5個hello玫锋,全都進入一個task
        builder.setBolt("wordCount", new WordCountBolt(), 10)
        //為bolt 設置 幾個task
        .setNumTasks(20)
        //設置流分組策略
        .fieldsGrouping("SplitSentence", new Fields("word"));
        
        // 運行配置項
        Config config = new Config();
        
        //說明是在命令行執(zhí)行,打算提交到storm集群上去
        if(args != null && args.length > 0){
            /** 
             *  要想提高storm的并行度可以從三個方面來改造
             *  worker(進程)>executor(線程)>task(實例)
             *  增加work進程讼呢,增加executor線程撩鹿,增加task實例
             *  對應 supervisor.slots.port 中配置個數(shù)
             *  這里可以動態(tài)設置使用個數(shù)
             *  最好一臺機器上的一個topology只使用一個worker,主要原因時減少了worker之間的數(shù)據(jù)傳輸
             *  
             *  注意:如果worker使用完的話再提交topology就不會執(zhí)行,因為沒有可用的worker悦屏,只能處于等待狀態(tài)节沦,把之前運行的topology停止一個之后這個就會繼續(xù)執(zhí)行了
             */
            config.setNumWorkers(3);
            try {
                // 將Topolog提交集群
                StormSubmitter.submitTopology(args[0], config, builder.createTopology());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }else{
            // 說明是在eclipse里面本地運行
            
            // 用本地模式運行1個拓撲時,用來限制生成的線程的數(shù)量
            config.setMaxTaskParallelism(20);
            
            // 將Topolog提交本地集群
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("wordCountTopology", config, builder.createTopology());
            
            // 為了測試模擬等待
            Utils.sleep(60000);
            // 執(zhí)行完畢甫贯,關閉cluster
            cluster.shutdown();
        }
    }
  • 運行main 方法,查看效果
運行效果

總結:本次demo看蚜,講解了如何從代碼層面理解storm 的集群架構叫搁、核心概念、并行度供炎、流分組渴逻,結合上一篇文章,同時展現(xiàn)了Spout 到 Bolt音诫,Bolt 到 Bolt 的通信

以上就是本章內容惨奕,如有不對的地方,請多多指教纽竣,謝謝墓贿!

為了方便有需要的人,本系列全部軟件都在 https://pan.baidu.com/s/1qYsJZfY

下章預告:主要講解 storm 集群搭建

代碼地址附上:https://github.com/bill5/cache-project/tree/master/storm-wordcount

作者:逐暗者 (轉載請注明出處)

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末蜓氨,一起剝皮案震驚了整個濱河市聋袋,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌穴吹,老刑警劉巖幽勒,帶你破解...
    沈念sama閱讀 219,589評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異港令,居然都是意外死亡啥容,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,615評論 3 396
  • 文/潘曉璐 我一進店門顷霹,熙熙樓的掌柜王于貴愁眉苦臉地迎上來咪惠,“玉大人,你說我怎么就攤上這事淋淀∫C粒” “怎么了?”我有些...
    開封第一講書人閱讀 165,933評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長炭臭。 經(jīng)常有香客問我永脓,道長,這世上最難降的妖魔是什么鞋仍? 我笑而不...
    開封第一講書人閱讀 58,976評論 1 295
  • 正文 為了忘掉前任常摧,我火速辦了婚禮,結果婚禮上威创,老公的妹妹穿的比我還像新娘落午。我一直安慰自己,他們只是感情好肚豺,可當我...
    茶點故事閱讀 67,999評論 6 393
  • 文/花漫 我一把揭開白布板甘。 她就那樣靜靜地躺著,像睡著了一般详炬。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上寞奸,一...
    開封第一講書人閱讀 51,775評論 1 307
  • 那天呛谜,我揣著相機與錄音,去河邊找鬼枪萄。 笑死隐岛,一個胖子當著我的面吹牛,可吹牛的內容都是我干的瓷翻。 我是一名探鬼主播聚凹,決...
    沈念sama閱讀 40,474評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼齐帚!你這毒婦竟也來了妒牙?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 39,359評論 0 276
  • 序言:老撾萬榮一對情侶失蹤对妄,失蹤者是張志新(化名)和其女友劉穎湘今,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體剪菱,經(jīng)...
    沈念sama閱讀 45,854評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡摩瞎,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 38,007評論 3 338
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了孝常。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片旗们。...
    茶點故事閱讀 40,146評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖构灸,靈堂內的尸體忽然破棺而出上渴,到底是詐尸還是另有隱情,我是刑警寧澤,帶...
    沈念sama閱讀 35,826評論 5 346
  • 正文 年R本政府宣布驰贷,位于F島的核電站盛嘿,受9級特大地震影響,放射性物質發(fā)生泄漏括袒。R本人自食惡果不足惜次兆,卻給世界環(huán)境...
    茶點故事閱讀 41,484評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望锹锰。 院中可真熱鬧芥炭,春花似錦、人聲如沸恃慧。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,029評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽痢士。三九已至彪薛,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間怠蹂,已是汗流浹背善延。 一陣腳步聲響...
    開封第一講書人閱讀 33,153評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留城侧,地道東北人易遣。 一個月前我還...
    沈念sama閱讀 48,420評論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像嫌佑,于是被迫代替她去往敵國和親豆茫。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,107評論 2 356

推薦閱讀更多精彩內容