JStorm:單詞計數(shù)-開發(fā)示例

JStorm:1怯晕、概念與編程模型
JStorm:2缸棵、任務(wù)調(diào)度

轉(zhuǎn)載自個人博客
示例功能說明:統(tǒng)計單詞出現(xiàn)的次數(shù),spout將持續(xù)輸入的一句句話作為輸入流吧凉,bolt將一句話分割成單詞踏志,最后統(tǒng)計每個單詞出現(xiàn)的次數(shù)。

示例介紹

如下圖所示针余,單詞計數(shù)topology由一個spout和下游三個bolt組成赏廓。


SentenceSpout:向后端發(fā)射一個單值tuple組成的數(shù)據(jù)流傍妒,鍵名“sentence”,tuple如下:
{“sentence”:“my name is zhangsan”}
SplitSentenceBolt:訂閱SentenceSpout發(fā)射的數(shù)據(jù)流颤练,將“sentence”中的語句分割為一個個單詞,向后端發(fā)射“word”組成的tuple如下:
{“word”:“my”}
{“word”:“name”}
{“word”:“is”}
{“word”:“zhangsan”}
WordCountBolt:訂閱SplitSentenceBolt發(fā)射的數(shù)據(jù)流患雇,保存每個特定單詞出現(xiàn)的次數(shù)宇挫,每當(dāng)bolt收到一個tuple,將對應(yīng)單詞的計數(shù)加一器瘪,并想后發(fā)射該單詞當(dāng)前的計數(shù)。
{“word”:“my”,“count”:“5”}
ReportBolt:訂閱WordCountBolt的輸出流橡疼,維護(hù)一份所有單詞對應(yīng)的計數(shù)表,結(jié)束時將所有值打印住拭。

代碼實現(xiàn)

添加Pom.xml依賴

<dependency>
        <groupId>com.alibaba.jstorm</groupId>
        <artifactId>jstorm-core</artifactId>
        <version>2.2.1</version>
        <!-- <scope>provided</scope> -->
</dependency>

SentenceSpout:繼承BaseRichSpout類历帚,在nextTuple方法中生成并向后發(fā)射數(shù)據(jù)流,declareOutputFields方法定義了向后發(fā)射數(shù)據(jù)流tuple的字段名為:sentence挽牢。
SplitSentenceBolt:繼承BaseRichBolt類,在execute方法中將接收到的tuple分割為單詞卓研,并向后傳輸tuple,declareOutputFields定義了tuple字段為word寥闪。
WordCountBolt:繼承BaseRichBolt磨淌,在execute方法中統(tǒng)計單詞出現(xiàn)的次數(shù),本地使用HashMap保存所有單詞出現(xiàn)的次數(shù)梁只。接收到tuple后更新該單詞出現(xiàn)的次數(shù)并向后傳輸tuple埃脏,declareOutputFields定義了tuple為"word", "count"秋忙。
ReportBolt:繼承BaseRichBolt類,在execute方法中匯總所有單詞出現(xiàn)的次數(shù)灰追。本地使用HashMap保存所有單詞出現(xiàn)的次數(shù)。當(dāng)任務(wù)結(jié)束時朴下,Cleanup方法打印統(tǒng)計結(jié)果。
WordCountTopology:創(chuàng)建topology殴胧,定義了Spout以及Bolt之間數(shù)據(jù)流傳輸?shù)囊?guī)則佩迟,以及并發(fā)數(shù)(前后并發(fā)為2、2音五、4羔沙、1)。進(jìn)程(worker)扼雏、線程(Executor)與Task之間的關(guān)系如下圖:


核心代碼參考如下诗充,注意其中的shuffleGrouping設(shè)定向后傳輸數(shù)據(jù)流為隨機(jī)苍蔬,fieldsGrouping按照字段值向后傳輸數(shù)據(jù)流蝴蜓,能保證同一個單詞由同一個WordCountBolt統(tǒng)計,而globalGrouping保證匯總的bolt是單例格仲。

WordCountTopology.java

//WordCountTopology代碼
import storm.blueprints.word.v1.*;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import static storm.blueprints.utils.Utils.*;

public class WordCountTopology {

    private static final String SENTENCE_SPOUT_ID = "sentence-spout";
    private static final String SPLIT_BOLT_ID = "split-bolt";
    private static final String COUNT_BOLT_ID = "count-bolt";
    private static final String REPORT_BOLT_ID = "report-bolt";
    private static final String TOPOLOGY_NAME = "word-count-topology";

    public static void main(String[] args) throws Exception {

        SentenceSpout spout = new SentenceSpout();
        SplitSentenceBolt splitBolt = new SplitSentenceBolt();
        WordCountBolt countBolt = new WordCountBolt();
        ReportBolt reportBolt = new ReportBolt();


        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout(SENTENCE_SPOUT_ID, spout, 2);
        // SentenceSpout --> SplitSentenceBolt
        builder.setBolt(SPLIT_BOLT_ID, splitBolt, 2)
                .setNumTasks(4)
                .shuffleGrouping(SENTENCE_SPOUT_ID);
        // SplitSentenceBolt --> WordCountBolt
        builder.setBolt(COUNT_BOLT_ID, countBolt, 4)
                .fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
        // WordCountBolt --> ReportBolt
        builder.setBolt(REPORT_BOLT_ID, reportBolt)
                .globalGrouping(COUNT_BOLT_ID);

        Config config = new Config();
        config.setNumWorkers(2);

        LocalCluster cluster = new LocalCluster();

        cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
        waitForSeconds(10);
        cluster.killTopology(TOPOLOGY_NAME);
        cluster.shutdown();
    }
}

SentenceSpout.java

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import storm.blueprints.utils.Utils;

import java.util.Map;

public class SentenceSpout extends BaseRichSpout {

    private SpoutOutputCollector collector;
    private String[] sentences = {
        "my dog has fleas",
        "i like cold beverages",
        "the dog ate my homework",
        "don't have a cow man",
        "i don't think i like fleas"
    };
    private int index = 0;

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sentence"));
    }

    public void open(Map config, TopologyContext context, 
            SpoutOutputCollector collector) {
        this.collector = collector;
    }

    public void nextTuple() {
        this.collector.emit(new Values(sentences[index]));
        index++;
        if (index >= sentences.length) {
            index = 0;
        }
        Utils.waitForMillis(1000);
    }
}

SplitSentenceBolt.java

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.Map;

public class SplitSentenceBolt extends BaseRichBolt{
    private OutputCollector collector;

    public void prepare(Map config, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    public void execute(Tuple tuple) {
        String sentence = tuple.getStringByField("sentence");
        String[] words = sentence.split(" ");
        for(String word : words){
            this.collector.emit(new Values(word));
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

WordCountBolt.java

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.HashMap;
import java.util.Map;

public class WordCountBolt extends BaseRichBolt{
    private OutputCollector collector;
    private HashMap<String, Long> counts = null;

    public void prepare(Map config, TopologyContext context, 
            OutputCollector collector) {
        this.collector = collector;
        this.counts = new HashMap<String, Long>();
    }

    public void execute(Tuple tuple) {
        String word = tuple.getStringByField("word");
        Long count = this.counts.get(word);
        if(count == null){
            count = 0L;
        }
        count++;
        this.counts.put(word, count);
        this.collector.emit(new Values(word, count));
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}

ReportBolt.java

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class ReportBolt extends BaseRichBolt {

    private HashMap<String, Long> counts = null;

    public void prepare(Map config, TopologyContext context, OutputCollector collector) {
        this.counts = new HashMap<String, Long>();
    }

    public void execute(Tuple tuple) {
        String word = tuple.getStringByField("word");
        Long count = tuple.getLongByField("count");
        this.counts.put(word, count);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // this bolt does not emit anything
    }

    @Override
    public void cleanup() {
        System.out.println("--- FINAL COUNTS ---");
        List<String> keys = new ArrayList<String>();
        keys.addAll(this.counts.keySet());
        Collections.sort(keys);
        for (String key : keys) {
            System.out.println(key + " : " + this.counts.get(key));
        }
        System.out.println("--------------");
    }
}

Utils.java

public class Utils {

    public static void waitForSeconds(int seconds) {
        try {
            Thread.sleep(seconds * 1000);
        } catch (InterruptedException e) {
        }
    }

    public static void waitForMillis(long milliseconds) {
        try {
            Thread.sleep(milliseconds);
        } catch (InterruptedException e) {
        }
    }
}

轉(zhuǎn)載請標(biāo)明出處

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末汽馋,一起剝皮案震驚了整個濱河市圈盔,隨后出現(xiàn)的幾起案子悄雅,更是在濱河造成了極大的恐慌驱敲,老刑警劉巖宽闲,帶你破解...
    沈念sama閱讀 206,602評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異便锨,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)放案,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,442評論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來掸冤,“玉大人,你說我怎么就攤上這事稿湿⊙荷蓿” “怎么了饺藤?”我有些...
    開封第一講書人閱讀 152,878評論 0 344
  • 文/不壞的土叔 我叫張陵流礁,是天一觀的道長。 經(jīng)常有香客問我再姑,道長,這世上最難降的妖魔是什么元镀? 我笑而不...
    開封第一講書人閱讀 55,306評論 1 279
  • 正文 為了忘掉前任霎桅,我火速辦了婚禮栖疑,結(jié)果婚禮上哆档,老公的妹妹穿的比我還像新娘。我一直安慰自己,他們只是感情好比原,可當(dāng)我...
    茶點故事閱讀 64,330評論 5 373
  • 文/花漫 我一把揭開白布杠巡。 她就那樣靜靜地躺著,像睡著了一般氢拥。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上嫩海,一...
    開封第一講書人閱讀 49,071評論 1 285
  • 那天,我揣著相機(jī)與錄音审葬,去河邊找鬼。 笑死奕谭,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的血柳。 我是一名探鬼主播,決...
    沈念sama閱讀 38,382評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼难捌,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了栖榨?” 一聲冷哼從身側(cè)響起昆汹,我...
    開封第一講書人閱讀 37,006評論 0 259
  • 序言:老撾萬榮一對情侶失蹤明刷,失蹤者是張志新(化名)和其女友劉穎婴栽,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體辈末,經(jīng)...
    沈念sama閱讀 43,512評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡愚争,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,965評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了挤聘。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片轰枝。...
    茶點故事閱讀 38,094評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖组去,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情,我是刑警寧澤诚撵,帶...
    沈念sama閱讀 33,732評論 4 323
  • 正文 年R本政府宣布缭裆,位于F島的核電站,受9級特大地震影響寿烟,放射性物質(zhì)發(fā)生泄漏澈驼。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,283評論 3 307
  • 文/蒙蒙 一筛武、第九天 我趴在偏房一處隱蔽的房頂上張望缝其。 院中可真熱鬧,春花似錦徘六、人聲如沸内边。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,286評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽假残。三九已至,卻和暖如春炉擅,著一層夾襖步出監(jiān)牢的瞬間辉懒,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,512評論 1 262
  • 我被黑心中介騙來泰國打工谍失, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留眶俩,地道東北人。 一個月前我還...
    沈念sama閱讀 45,536評論 2 354
  • 正文 我出身青樓快鱼,卻偏偏與公主長得像颠印,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子抹竹,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,828評論 2 345

推薦閱讀更多精彩內(nèi)容

  • 一. wordCount Topology開發(fā): 1.spout數(shù)據(jù)收集器(SentenceSpout類): 有...
    奉先閱讀 1,179評論 0 0
  • 原文鏈接Storm Tutorial 本人原創(chuàng)翻譯线罕,轉(zhuǎn)載請注明出處 這個教程內(nèi)容包含如何創(chuàng)建topologies及...
    quiterr閱讀 1,606評論 0 6
  • Date: Nov 17-24, 2017 1. 目的 積累Storm為主的流式大數(shù)據(jù)處理平臺對實時數(shù)據(jù)處理的相關(guān)...
    一只很努力爬樹的貓閱讀 2,158評論 0 4
  • 這是一個JStorm使用教程,不包含環(huán)境搭建教程窃判,直接在公司現(xiàn)有集群上跑任務(wù)钞楼,關(guān)于JStorm集群環(huán)境搭建,后續(xù)研...
    Coselding閱讀 6,298評論 1 9
  • 在愛情這場博弈中袄琳, 從不曾有勝負(fù)之分询件, 要么兩敗俱傷, 要么皆大歡喜唆樊, 無論怎樣的結(jié)局宛琅, 我們都不曾后悔, 只因我...
    夏藜若閱讀 137評論 2 0