Storm入門例子詳解-單詞計(jì)數(shù)器
概念
Storm 分布式計(jì)算結(jié)構(gòu)稱為 topology(拓?fù)洌┥髁辏?stream(數(shù)據(jù)流)示血, spout(數(shù)據(jù)流的生成者), bolt(運(yùn)算)組成。
Storm 的核心數(shù)據(jù)結(jié)構(gòu)是 tuple祠墅。 tuple是 包 含 了 一 個(gè) 或 者 多 個(gè) 鍵 值 對(duì) 的 列 表,Stream 是 由 無 限 制 的 tuple 組 成 的 序 列歌径。
spout 代表了一個(gè) Storm topology 的主要數(shù)據(jù)入口毁嗦,充當(dāng)采集器的角色,連接到數(shù)據(jù)源回铛,將數(shù)據(jù)轉(zhuǎn)化為一個(gè)個(gè) tuple狗准,并將 tuple 作為數(shù)據(jù)流進(jìn)行發(fā)射克锣。
bolt 可以理解為計(jì)算程序中的運(yùn)算或者函數(shù),將一個(gè)或者多個(gè)數(shù)據(jù)流作為輸入腔长,對(duì)數(shù)據(jù)實(shí)施運(yùn)算后袭祟,選擇性地輸出一個(gè)或者多個(gè)數(shù)據(jù)流。 bolt 可以訂閱多個(gè)由 spout 或者其他bolt 發(fā)射的數(shù)據(jù)流捞附,這樣就可以建立復(fù)雜的數(shù)據(jù)流轉(zhuǎn)換網(wǎng)絡(luò)巾乳。
本例子單詞計(jì)數(shù) topology 的數(shù)據(jù)流大概是這樣:
項(xiàng)目搭建
新建類SentenceSpout.java(數(shù)據(jù)流生成者)
import java.util.Map;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
/**
* 向后端發(fā)射tuple數(shù)據(jù)流
* @author soul
*
*/
public class SentenceSpout extends BaseRichSpout {
//BaseRichSpout是ISpout接口和IComponent接口的簡單實(shí)現(xiàn),接口對(duì)用不到的方法提供了默認(rèn)的實(shí)現(xiàn)
private SpoutOutputCollector collector;
private String[] sentences = {
"my name is soul",
"im a boy",
"i have a dog",
"my dog has fleas",
"my girl friend is beautiful"
};
private int index=0;
/**
* open()方法中是ISpout接口中定義鸟召,在Spout組件初始化時(shí)被調(diào)用胆绊。
* open()接受三個(gè)參數(shù):一個(gè)包含Storm配置的Map,一個(gè)TopologyContext對(duì)象,提供了topology中組件的信息,SpoutOutputCollector對(duì)象提供發(fā)射tuple的方法欧募。
* 在這個(gè)例子中,我們不需要執(zhí)行初始化,只是簡單的存儲(chǔ)在一個(gè)SpoutOutputCollector實(shí)例變量压状。
*/
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
// TODO Auto-generated method stub
this.collector = collector;
}
/**
* nextTuple()方法是任何Spout實(shí)現(xiàn)的核心。
* Storm調(diào)用這個(gè)方法跟继,向輸出的collector發(fā)出tuple何缓。
* 在這里,我們只是發(fā)出當(dāng)前索引的句子,并增加該索引準(zhǔn)備發(fā)射下一個(gè)句子还栓。
*/
public void nextTuple() {
//collector.emit(new Values("hello world this is a test"));
// TODO Auto-generated method stub
this.collector.emit(new Values(sentences[index]));
index++;
if (index>=sentences.length) {
index=0;
}
Utils.sleep(1);
}
/**
* declareOutputFields是在IComponent接口中定義的碌廓,所有Storm的組件(spout和bolt)都必須實(shí)現(xiàn)這個(gè)接口
* 用于告訴Storm流組件將會(huì)發(fā)出那些數(shù)據(jù)流,每個(gè)流的tuple將包含的字段
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
declarer.declare(new Fields("sentence"));//告訴組件發(fā)出數(shù)據(jù)流包含sentence字段
}
}
新建類SplitSentenceBolt.java(單詞分割器)
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
/**
* 訂閱sentence spout發(fā)射的tuple流剩盒,實(shí)現(xiàn)分割單詞
* @author soul
*
*/
public class SplitSentenceBolt extends BaseRichBolt {
//BaseRichBolt是IComponent和IBolt接口的實(shí)現(xiàn)
//繼承這個(gè)類谷婆,就不用去實(shí)現(xiàn)本例不關(guān)心的方法
private OutputCollector collector;
/**
* prepare()方法類似于ISpout 的open()方法。
* 這個(gè)方法在blot初始化時(shí)調(diào)用辽聊,可以用來準(zhǔn)備bolt用到的資源,比如數(shù)據(jù)庫連接纪挎。
* 本例子和SentenceSpout類一樣,SplitSentenceBolt類不需要太多額外的初始化,
* 所以prepare()方法只保存OutputCollector對(duì)象的引用。
*/
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
// TODO Auto-generated method stub
this.collector=collector;
}
/**
* SplitSentenceBolt核心功能是在類IBolt定義execute()方法跟匆,這個(gè)方法是IBolt接口中定義异袄。
* 每次Bolt從流接收一個(gè)訂閱的tuple,都會(huì)調(diào)用這個(gè)方法玛臂。
* 本例中,收到的元組中查找“sentence”的值,
* 并將該值拆分成單個(gè)的詞,然后按單詞發(fā)出新的tuple烤蜕。
*/
public void execute(Tuple input) {
// TODO Auto-generated method stub
String sentence = input.getStringByField("sentence");
String[] words = sentence.split(" ");
for (String word : words) {
this.collector.emit(new Values(word));//向下一個(gè)bolt發(fā)射數(shù)據(jù)
}
}
/**
* plitSentenceBolt類定義一個(gè)元組流,每個(gè)包含一個(gè)字段(“word”)。
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
declarer.declare(new Fields("word"));
}
}
新建類WordCountBolt.java(單詞計(jì)數(shù)器)
import java.util.HashMap;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
/**
* 訂閱 split sentence bolt的輸出流迹冤,實(shí)現(xiàn)單詞計(jì)數(shù)讽营,并發(fā)送當(dāng)前計(jì)數(shù)給下一個(gè)bolt
* @author soul
*
*/
public class WordCountBolt extends BaseRichBolt {
private OutputCollector collector;
//存儲(chǔ)單詞和對(duì)應(yīng)的計(jì)數(shù)
private HashMap<String, Long> counts = null;//注:不可序列化對(duì)象需在prepare中實(shí)例化
/**
* 大部分實(shí)例變量通常是在prepare()中進(jìn)行實(shí)例化,這個(gè)設(shè)計(jì)模式是由topology的部署方式?jīng)Q定的
* 因?yàn)樵诓渴鹜負(fù)鋾r(shí),組件spout和bolt是在網(wǎng)絡(luò)上發(fā)送的序列化的實(shí)例變量泡徙。
* 如果spout或bolt有任何non-serializable實(shí)例變量在序列化之前被實(shí)例化(例如,在構(gòu)造函數(shù)中創(chuàng)建)
* 會(huì)拋出NotSerializableException并且拓?fù)鋵o法發(fā)布橱鹏。
* 本例中因?yàn)镠ashMap 是可序列化的,所以可以安全地在構(gòu)造函數(shù)中實(shí)例化。
* 但是,通常情況下最好是在構(gòu)造函數(shù)中對(duì)基本數(shù)據(jù)類型和可序列化的對(duì)象進(jìn)行復(fù)制和實(shí)例化
* 而在prepare()方法中對(duì)不可序列化的對(duì)象進(jìn)行實(shí)例化莉兰。
*/
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
// TODO Auto-generated method stub
this.collector = collector;
this.counts = new HashMap<String, Long>();
}
/**
* 在execute()方法中,我們查找的收到的單詞的計(jì)數(shù)(如果不存在挑围,初始化為0)
* 然后增加計(jì)數(shù)并存儲(chǔ),發(fā)出一個(gè)新的詞和當(dāng)前計(jì)數(shù)組成的二元組。
* 發(fā)射計(jì)數(shù)作為流允許拓?fù)涞钠渌鸼olt訂閱和執(zhí)行額外的處理糖荒。
*/
public void execute(Tuple input) {
// TODO Auto-generated method stub
String word = input.getStringByField("word");
Long count = this.counts.get(word);
if (count == null) {
count = 0L;//如果不存在杉辙,初始化為0
}
count++;//增加計(jì)數(shù)
this.counts.put(word, count);//存儲(chǔ)計(jì)數(shù)
this.collector.emit(new Values(word,count));
}
/**
*
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
//聲明一個(gè)輸出流,其中tuple包括了單詞和對(duì)應(yīng)的計(jì)數(shù)寂嘉,向后發(fā)射
//其他bolt可以訂閱這個(gè)數(shù)據(jù)流進(jìn)一步處理
declarer.declare(new Fields("word","count"));
}
}
新建類ReportBolt.java(報(bào)告生成器)
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
/**
* 生成一份報(bào)告
* @author soul
*
*/
public class ReportBolt extends BaseRichBolt {
private HashMap<String, Long> counts = null;//保存單詞和對(duì)應(yīng)的計(jì)數(shù)
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
// TODO Auto-generated method stub
this.counts = new HashMap<String, Long>();
}
public void execute(Tuple input) {
// TODO Auto-generated method stub
String word = input.getStringByField("word");
Long count = input.getLongByField("count");
this.counts.put(word, count);
//實(shí)時(shí)輸出
System.out.println("結(jié)果:"+this.counts);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
//這里是末端bolt奏瞬,不需要發(fā)射數(shù)據(jù)流枫绅,這里無需定義
}
/**
* cleanup是IBolt接口中定義
* Storm在終止一個(gè)bolt之前會(huì)調(diào)用這個(gè)方法
* 本例我們利用cleanup()方法在topology關(guān)閉時(shí)輸出最終的計(jì)數(shù)結(jié)果
* 通常情況下泉孩,cleanup()方法用來釋放bolt占用的資源,如打開的文件句柄或數(shù)據(jù)庫連接
* 但是當(dāng)Storm拓?fù)湓谝粋€(gè)集群上運(yùn)行并淋,IBolt.cleanup()方法不能保證執(zhí)行(這里是開發(fā)模式寓搬,生產(chǎn)環(huán)境不要這樣做)。
*/
public void cleanup(){
System.out.println("---------- FINAL COUNTS -----------");
ArrayList<String> keys = new ArrayList<String>();
keys.addAll(this.counts.keySet());
Collections.sort(keys);
for(String key : keys){
System.out.println(key + " : " + this.counts.get(key));
}
System.out.println("----------------------------");
}
}
修改程序主入口App.java
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.Utils;
/**
* 實(shí)現(xiàn)單詞計(jì)數(shù)topology
*
*/
public class App
{
private static final String SENTENCE_SPOUT_ID = "sentence-spout";
private static final String SPLIT_BOLT_ID = "split-bolt";
private static final String COUNT_BOLT_ID = "count-bolt";
private static final String REPORT_BOLT_ID = "report-bolt";
private static final String TOPOLOGY_NAME = "word-count-topology";
public static void main( String[] args ) //throws Exception
{
//System.out.println( "Hello World!" );
//實(shí)例化spout和bolt
SentenceSpout spout = new SentenceSpout();
SplitSentenceBolt splitBolt = new SplitSentenceBolt();
WordCountBolt countBolt = new WordCountBolt();
ReportBolt reportBolt = new ReportBolt();
TopologyBuilder builder = new TopologyBuilder();//創(chuàng)建了一個(gè)TopologyBuilder實(shí)例
//TopologyBuilder提供流式風(fēng)格的API來定義topology組件之間的數(shù)據(jù)流
//builder.setSpout(SENTENCE_SPOUT_ID, spout);//注冊(cè)一個(gè)sentence spout
//設(shè)置兩個(gè)Executeor(線程)县耽,默認(rèn)一個(gè)
builder.setSpout(SENTENCE_SPOUT_ID, spout,2);
// SentenceSpout --> SplitSentenceBolt
//注冊(cè)一個(gè)bolt并訂閱sentence發(fā)射出的數(shù)據(jù)流句喷,shuffleGrouping方法告訴Storm要將SentenceSpout發(fā)射的tuple隨機(jī)均勻的分發(fā)給SplitSentenceBolt的實(shí)例
//builder.setBolt(SPLIT_BOLT_ID, splitBolt).shuffleGrouping(SENTENCE_SPOUT_ID);
//SplitSentenceBolt單詞分割器設(shè)置4個(gè)Task,2個(gè)Executeor(線程)
builder.setBolt(SPLIT_BOLT_ID, splitBolt,2).setNumTasks(4).shuffleGrouping(SENTENCE_SPOUT_ID);
// SplitSentenceBolt --> WordCountBolt
//fieldsGrouping將含有特定數(shù)據(jù)的tuple路由到特殊的bolt實(shí)例中
//這里fieldsGrouping()方法保證所有“word”字段相同的tuuple會(huì)被路由到同一個(gè)WordCountBolt實(shí)例中
//builder.setBolt(COUNT_BOLT_ID, countBolt).fieldsGrouping( SPLIT_BOLT_ID, new Fields("word"));
//WordCountBolt單詞計(jì)數(shù)器設(shè)置4個(gè)Executeor(線程)
builder.setBolt(COUNT_BOLT_ID, countBolt,4).fieldsGrouping( SPLIT_BOLT_ID, new Fields("word"));
// WordCountBolt --> ReportBolt
//globalGrouping是把WordCountBolt發(fā)射的所有tuple路由到唯一的ReportBolt
builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(COUNT_BOLT_ID);
Config config = new Config();//Config類是一個(gè)HashMap<String,Object>的子類兔毙,用來配置topology運(yùn)行時(shí)的行為
//設(shè)置worker數(shù)量
//config.setNumWorkers(2);
LocalCluster cluster = new LocalCluster();
//本地提交
cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology(TOPOLOGY_NAME);
cluster.shutdown();
}
}
運(yùn)行程序唾琼,可看到單詞計(jì)數(shù)實(shí)時(shí)輸出效果
運(yùn)行10秒后生成報(bào)告