JStorm:1怯晕、概念與編程模型
JStorm:2缸棵、任務(wù)調(diào)度
轉(zhuǎn)載自個人博客
示例功能說明:統(tǒng)計單詞出現(xiàn)的次數(shù),spout將持續(xù)輸入的一句句話作為輸入流吧凉,bolt將一句話分割成單詞踏志,最后統(tǒng)計每個單詞出現(xiàn)的次數(shù)。
示例介紹
如下圖所示针余,單詞計數(shù)topology由一個spout和下游三個bolt組成赏廓。
SentenceSpout:向后端發(fā)射一個單值tuple組成的數(shù)據(jù)流傍妒,鍵名“sentence”,tuple如下:
{“sentence”:“my name is zhangsan”}
SplitSentenceBolt:訂閱SentenceSpout發(fā)射的數(shù)據(jù)流颤练,將“sentence”中的語句分割為一個個單詞,向后端發(fā)射“word”組成的tuple如下:
{“word”:“my”}
{“word”:“name”}
{“word”:“is”}
{“word”:“zhangsan”}
WordCountBolt:訂閱SplitSentenceBolt發(fā)射的數(shù)據(jù)流患雇,保存每個特定單詞出現(xiàn)的次數(shù)宇挫,每當(dāng)bolt收到一個tuple,將對應(yīng)單詞的計數(shù)加一器瘪,并想后發(fā)射該單詞當(dāng)前的計數(shù)。
{“word”:“my”,“count”:“5”}
ReportBolt:訂閱WordCountBolt的輸出流橡疼,維護(hù)一份所有單詞對應(yīng)的計數(shù)表,結(jié)束時將所有值打印住拭。
代碼實現(xiàn)
添加Pom.xml依賴
<dependency>
<groupId>com.alibaba.jstorm</groupId>
<artifactId>jstorm-core</artifactId>
<version>2.2.1</version>
<!-- <scope>provided</scope> -->
</dependency>
SentenceSpout:繼承BaseRichSpout類历帚,在nextTuple方法中生成并向后發(fā)射數(shù)據(jù)流,declareOutputFields方法定義了向后發(fā)射數(shù)據(jù)流tuple的字段名為:sentence挽牢。
SplitSentenceBolt:繼承BaseRichBolt類,在execute方法中將接收到的tuple分割為單詞卓研,并向后傳輸tuple,declareOutputFields定義了tuple字段為word寥闪。
WordCountBolt:繼承BaseRichBolt磨淌,在execute方法中統(tǒng)計單詞出現(xiàn)的次數(shù),本地使用HashMap保存所有單詞出現(xiàn)的次數(shù)梁只。接收到tuple后更新該單詞出現(xiàn)的次數(shù)并向后傳輸tuple埃脏,declareOutputFields定義了tuple為"word", "count"秋忙。
ReportBolt:繼承BaseRichBolt類,在execute方法中匯總所有單詞出現(xiàn)的次數(shù)灰追。本地使用HashMap保存所有單詞出現(xiàn)的次數(shù)。當(dāng)任務(wù)結(jié)束時朴下,Cleanup方法打印統(tǒng)計結(jié)果。
WordCountTopology:創(chuàng)建topology殴胧,定義了Spout以及Bolt之間數(shù)據(jù)流傳輸?shù)囊?guī)則佩迟,以及并發(fā)數(shù)(前后并發(fā)為2、2音五、4羔沙、1)。進(jìn)程(worker)扼雏、線程(Executor)與Task之間的關(guān)系如下圖:
核心代碼參考如下诗充,注意其中的shuffleGrouping設(shè)定向后傳輸數(shù)據(jù)流為隨機(jī)苍蔬,fieldsGrouping按照字段值向后傳輸數(shù)據(jù)流蝴蜓,能保證同一個單詞由同一個WordCountBolt統(tǒng)計,而globalGrouping保證匯總的bolt是單例格仲。
WordCountTopology.java
//WordCountTopology代碼
import storm.blueprints.word.v1.*;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import static storm.blueprints.utils.Utils.*;
public class WordCountTopology {
private static final String SENTENCE_SPOUT_ID = "sentence-spout";
private static final String SPLIT_BOLT_ID = "split-bolt";
private static final String COUNT_BOLT_ID = "count-bolt";
private static final String REPORT_BOLT_ID = "report-bolt";
private static final String TOPOLOGY_NAME = "word-count-topology";
public static void main(String[] args) throws Exception {
SentenceSpout spout = new SentenceSpout();
SplitSentenceBolt splitBolt = new SplitSentenceBolt();
WordCountBolt countBolt = new WordCountBolt();
ReportBolt reportBolt = new ReportBolt();
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(SENTENCE_SPOUT_ID, spout, 2);
// SentenceSpout --> SplitSentenceBolt
builder.setBolt(SPLIT_BOLT_ID, splitBolt, 2)
.setNumTasks(4)
.shuffleGrouping(SENTENCE_SPOUT_ID);
// SplitSentenceBolt --> WordCountBolt
builder.setBolt(COUNT_BOLT_ID, countBolt, 4)
.fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
// WordCountBolt --> ReportBolt
builder.setBolt(REPORT_BOLT_ID, reportBolt)
.globalGrouping(COUNT_BOLT_ID);
Config config = new Config();
config.setNumWorkers(2);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
waitForSeconds(10);
cluster.killTopology(TOPOLOGY_NAME);
cluster.shutdown();
}
}
SentenceSpout.java
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import storm.blueprints.utils.Utils;
import java.util.Map;
public class SentenceSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private String[] sentences = {
"my dog has fleas",
"i like cold beverages",
"the dog ate my homework",
"don't have a cow man",
"i don't think i like fleas"
};
private int index = 0;
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}
public void open(Map config, TopologyContext context,
SpoutOutputCollector collector) {
this.collector = collector;
}
public void nextTuple() {
this.collector.emit(new Values(sentences[index]));
index++;
if (index >= sentences.length) {
index = 0;
}
Utils.waitForMillis(1000);
}
}
SplitSentenceBolt.java
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.Map;
public class SplitSentenceBolt extends BaseRichBolt{
private OutputCollector collector;
public void prepare(Map config, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
public void execute(Tuple tuple) {
String sentence = tuple.getStringByField("sentence");
String[] words = sentence.split(" ");
for(String word : words){
this.collector.emit(new Values(word));
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
WordCountBolt.java
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.HashMap;
import java.util.Map;
public class WordCountBolt extends BaseRichBolt{
private OutputCollector collector;
private HashMap<String, Long> counts = null;
public void prepare(Map config, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
this.counts = new HashMap<String, Long>();
}
public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
Long count = this.counts.get(word);
if(count == null){
count = 0L;
}
count++;
this.counts.put(word, count);
this.collector.emit(new Values(word, count));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
ReportBolt.java
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ReportBolt extends BaseRichBolt {
private HashMap<String, Long> counts = null;
public void prepare(Map config, TopologyContext context, OutputCollector collector) {
this.counts = new HashMap<String, Long>();
}
public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
Long count = tuple.getLongByField("count");
this.counts.put(word, count);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// this bolt does not emit anything
}
@Override
public void cleanup() {
System.out.println("--- FINAL COUNTS ---");
List<String> keys = new ArrayList<String>();
keys.addAll(this.counts.keySet());
Collections.sort(keys);
for (String key : keys) {
System.out.println(key + " : " + this.counts.get(key));
}
System.out.println("--------------");
}
}
Utils.java
public class Utils {
public static void waitForSeconds(int seconds) {
try {
Thread.sleep(seconds * 1000);
} catch (InterruptedException e) {
}
}
public static void waitForMillis(long milliseconds) {
try {
Thread.sleep(milliseconds);
} catch (InterruptedException e) {
}
}
}
轉(zhuǎn)載請標(biāo)明出處