docker運行storm及wordcount實例

本文簡單介紹下怎么使用docker運行storm以及在springboot中使用storm。

docker-compose

version: '2'
services:
    zookeeper:
        image: zookeeper ##3.4.10
        container_name: zookeeper
        restart: always
        ports:
          - 2181:2181

    nimbus:
        image: storm ## 1.1.1
        container_name: nimbus
        command: storm nimbus
        depends_on:
            - zookeeper
        links:
            - zookeeper
        restart: always
        ports:
            - 6627:6627

    supervisor:
        image: storm
        container_name: supervisor
        command: storm supervisor
        depends_on:
            - nimbus
            - zookeeper
        links:
            - nimbus
            - zookeeper
        restart: always
    ui:
        image: storm
        container_name: stormui
        command: storm ui
        depends_on:
          - nimbus
          - zookeeper
        links:
          - nimbus
          - zookeeper
        restart: always
        ports:
          - 8080:8080

啟動之后訪問192.168.99.100:8080就可以看見storm-ui的界面

wordcount實例

TestWordSpout

public class TestWordSpout extends BaseRichSpout {
    public static Logger LOG = LoggerFactory.getLogger(TestWordSpout.class);
    boolean _isDistributed;
    SpoutOutputCollector _collector;

    public TestWordSpout() {
        this(true);
    }

    public TestWordSpout(boolean isDistributed) {
        _isDistributed = isDistributed;
    }
        
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        _collector = collector;
    }
    
    public void close() {
        
    }
        
    public void nextTuple() {
        Utils.sleep(100);
        final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
        final Random rand = new Random();
        final String word = words[rand.nextInt(words.length)];
        _collector.emit(new Values(word));
    }
    
    public void ack(Object msgId) {

    }

    public void fail(Object msgId) {
        
    }
    
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        if(!_isDistributed) {
            Map<String, Object> ret = new HashMap<String, Object>();
            ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
            return ret;
        } else {
            return null;
        }
    }    
}

WordCountBolt

public class WordCountBolt extends BaseBasicBolt {

    Map<String, Integer> counts = new HashMap<String, Integer>();

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String word = tuple.getString(0);
        Integer count = counts.get(word);
        if (count == null) count = 0;
        count++;
        counts.put(word, count);
        collector.emit(new Values(word, count));
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}

PrintBolt

public class PrintBolt extends BaseBasicBolt {

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String first = tuple.getString(0);
        int second = tuple.getInteger(1);
        System.out.println(first + "," + second);
    }
}

本地運行

@SpringBootApplication
public class StormDemoApplication implements CommandLineRunner{

    public static void main(String[] args) {
        SpringApplication app = new SpringApplication((StormDemoApplication.class));
        app.setWebEnvironment(false);
        app.run(args);
    }

    @Override
    public void run(String... args) throws Exception {

        TopologyBuilder builder = new TopologyBuilder();
        //并發(fā)度10
        builder.setSpout("spout", new TestWordSpout(), 10);
        builder.setBolt("count", new WordCountBolt(), 5).fieldsGrouping("spout", new Fields("word"));
        builder.setBolt("print", new PrintBolt(), 1).shuffleGrouping("count");

        String topologyName = "DemoTopology";
        Config conf = new Config();
        conf.setDebug(true);

        //遠程提交 mvn clean package -Dmaven.test.skip=true
//      StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology());
        try {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology(topologyName, conf,builder.createTopology());
            Thread.sleep(60 * 1000);
            cluster.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
        }


    }
}

遠程提交

修改提交方式二打,然后打jar包

        //遠程提交 mvn clean package -Dmaven.test.skip=true
        StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology());

遠程提交代碼

@Test
    public void remoteSubmit() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {

        Config conf = new Config();
        conf.put(Config.NIMBUS_SEEDS,Arrays.asList("192.168.99.100")); //配置nimbus連接主機地址扛伍,比如:192.168.10.1
        conf.put(Config.NIMBUS_THRIFT_PORT,6627);//配置nimbus連接端口,默認 6627
        conf.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList("192.168.99.100")); //配置zookeeper連接主機地址窒百,可以使用集合存放多個
        conf.put(Config.STORM_ZOOKEEPER_PORT,2181); //配置zookeeper連接端口黍判,默認2181
        conf.setDebug(true);
        conf.setNumWorkers(1);

        TopologyBuilder builder = new TopologyBuilder();
        //并發(fā)度10
        builder.setSpout("spout", new TestWordSpout(), 10);
        builder.setBolt("count", new WordCountBolt(), 5).fieldsGrouping("spout", new Fields("word"));
        builder.setBolt("print", new PrintBolt(), 1).shuffleGrouping("count");

        String topologyName = "DemoTopology";

        //非常關鍵的一步,使用StormSubmitter提交拓撲時篙梢,不管怎么樣顷帖,都是需要將所需的jar提交到nimbus上去,如果不指定jar文件路徑渤滞,
        //storm默認會使用System.getProperty("storm.jar")去取贬墩,如果不設定,就不能提交
        System.setProperty("storm.jar","/Users/downloads/storm-demo-0.0.1-SNAPSHOT.jar");
        StormSubmitter.submitTopology(topologyName, conf, builder.createTopology());
    }

clojars倉庫問題

修改~/.m2/settings.xml

    <mirrors>
        <mirror>
            <id>nexus-aliyun</id>
            <mirrorOf>*,!Clojars</mirrorOf>
            <name>Nexus aliyun</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public</url>
    </mirror>

    <repository>
           <id>Clojars</id>
           <name>Clojars Repository</name>
           <url>http://clojars.org/repo/</url>
           <releases><enabled>true</enabled></releases>
           <snapshots><enabled>true</enabled></snapshots>
    </repository>

doc

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末妄呕,一起剝皮案震驚了整個濱河市陶舞,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌绪励,老刑警劉巖肿孵,帶你破解...
    沈念sama閱讀 212,884評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異疏魏,居然都是意外死亡停做,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,755評論 3 385
  • 文/潘曉璐 我一進店門蠢护,熙熙樓的掌柜王于貴愁眉苦臉地迎上來雅宾,“玉大人,你說我怎么就攤上這事∶继В” “怎么了贯吓?”我有些...
    開封第一講書人閱讀 158,369評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長蜀变。 經常有香客問我悄谐,道長,這世上最難降的妖魔是什么库北? 我笑而不...
    開封第一講書人閱讀 56,799評論 1 285
  • 正文 為了忘掉前任爬舰,我火速辦了婚禮,結果婚禮上寒瓦,老公的妹妹穿的比我還像新娘情屹。我一直安慰自己,他們只是感情好杂腰,可當我...
    茶點故事閱讀 65,910評論 6 386
  • 文/花漫 我一把揭開白布垃你。 她就那樣靜靜地躺著,像睡著了一般喂很。 火紅的嫁衣襯著肌膚如雪惜颇。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 50,096評論 1 291
  • 那天少辣,我揣著相機與錄音凌摄,去河邊找鬼。 笑死漓帅,一個胖子當著我的面吹牛锨亏,可吹牛的內容都是我干的。 我是一名探鬼主播煎殷,決...
    沈念sama閱讀 39,159評論 3 411
  • 文/蒼蘭香墨 我猛地睜開眼屯伞,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了豪直?” 一聲冷哼從身側響起劣摇,我...
    開封第一講書人閱讀 37,917評論 0 268
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎弓乙,沒想到半個月后末融,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經...
    沈念sama閱讀 44,360評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡暇韧,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,673評論 2 327
  • 正文 我和宋清朗相戀三年勾习,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片懈玻。...
    茶點故事閱讀 38,814評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡巧婶,死狀恐怖,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情艺栈,我是刑警寧澤英岭,帶...
    沈念sama閱讀 34,509評論 4 334
  • 正文 年R本政府宣布,位于F島的核電站湿右,受9級特大地震影響诅妹,放射性物質發(fā)生泄漏。R本人自食惡果不足惜毅人,卻給世界環(huán)境...
    茶點故事閱讀 40,156評論 3 317
  • 文/蒙蒙 一吭狡、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧丈莺,春花似錦划煮、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,882評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至牵现,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間邀桑,已是汗流浹背瞎疼。 一陣腳步聲響...
    開封第一講書人閱讀 32,123評論 1 267
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留壁畸,地道東北人贼急。 一個月前我還...
    沈念sama閱讀 46,641評論 2 362
  • 正文 我出身青樓,卻偏偏與公主長得像捏萍,于是被迫代替她去往敵國和親太抓。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,728評論 2 351

推薦閱讀更多精彩內容