上一篇 簡單看 storm, 主要簡單講解了storm 的集群架構萍桌、核心概念、并行度凌简、流分組上炎,本篇利用 storm 結合代碼進行單詞計數(shù),講解從代碼層面理解入門storm号醉。
直接開擼代碼
單詞計數(shù)簡單的實現(xiàn)邏輯:
- 構造一個 Spout反症,為下游 Bolt 作業(yè)提供數(shù)據(jù)源
- 構造一個 Bolt,處理上游流向數(shù)據(jù)畔派,進行單詞切分
- 構造一個 Bolt铅碍,處理上游 Bolt ,進行單詞計數(shù)
- 將 Spout 线椰、Bolt 組裝起來胞谈,構建成一個拓撲(Topology)
- 將 Topology 提交到 storm 集群,等待結果
創(chuàng)建名為 storm-wordcount 的普通 maven project
- 在 pom.xml 引入相關類庫依賴
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>22.0</version>
</dependency>
</dependencies>
- 加入 plugin 憨愉,打包
<build>
<sourceDirectory>src/main/java</sourceDirectory>
<testSourceDirectory>test/main/java</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.sf</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.dsa</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>META-INF/*.rsa</exclude>
<exclude>META-INF/*.EC</exclude>
<exclude>META-INF/*.ec</exclude>
<exclude>META-INF/MSFTSIG.SF</exclude>
<exclude>META-INF/MSFTSIG.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>java</executable>
<includeProjectDependencies>true</includeProjectDependencies>
<includePluginDependencies>false</includePluginDependencies>
<classpathScope>compile</classpathScope>
<mainClass></mainClass>
</configuration>
</plugin>
</plugins>
</build>
新建一個 WordCountTopology 類
- 在 WordCountTopology 中編寫一個 RandomSentenceSpout 靜態(tài)內部類烦绳,繼承實現(xiàn) BaseRichSpout 抽象類
/**
*
* 編寫spout ,繼承一個基類,負責從數(shù)據(jù)源獲取數(shù)據(jù)
* @author bill
* @date 2017年9月16日 下午8:21:46
*/
public static class RandomSentenceSpout extends BaseRichSpout{
private static final long serialVersionUID = 6102239192526611945L;
private static final Logger LOGGER = LoggerFactory.getLogger(RandomSentenceSpout.class);
private SpoutOutputCollector collector;
private Random random;
/**
* 當一個Task被初始化的時候會調用此open方法,
* 一般都會在此方法中對發(fā)送Tuple的對象SpoutOutputCollector和配置對象TopologyContext初始化
*/
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
this.random = new Random();
}
/**
* 這個spout類配紫,之前說過径密,最終會運行在task中,某個worker進程的某個executor線程內部的某個task中
* 那個task會負責去不斷的無限循環(huán)調用nextTuple()方法
* 只要的話呢躺孝,無限循環(huán)調用享扔,可以不斷發(fā)射最新的數(shù)據(jù)出去,形成一個數(shù)據(jù)流
*/
public void nextTuple() {
String[] sentences = new String[]{
"I used to watch her from my kitchen widow"
, "she seemed so small as she muscled her way through the crowd of boys on the playground"
, "The school was across the street from our home and I would often watch the kids as they played during recess"
, "A sea of children, and yet tome"
, "she stood out from them all"};
String sentence = sentences[random.nextInt(sentences.length)];
LOGGER.info(" ★★★ 發(fā)射 sentence 數(shù)據(jù) > {}", sentence);
// 這個values植袍,你可以認為就是構建一個tuple,tuple是最小的數(shù)據(jù)單位惧眠,無限個tuple組成的流就是一個stream,通過 emit 發(fā)送數(shù)據(jù)到下游bolt tuple
this.collector.emit(new Values(sentence));
}
/**
* 用于聲明當前Spout的Tuple發(fā)送流的域名字。Stream流的定義是通過OutputFieldsDeclare.declareStream方法完成的
* 通俗點說法:就是這個方法是定義一個你發(fā)射出去的每個tuple中的每個field的名稱是什么于个,作為下游
bolt 中 execute 接收數(shù)據(jù) key
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}
}
- 在 WordCountTopology 中編寫一個 SplitSentenceBolt 靜態(tài)內部類氛魁,繼承實現(xiàn) BaseRichBolt 抽象類,用于處理上游 Spout 發(fā)送過來的數(shù)據(jù),這里做句子單詞切分
/**
*
* 編寫一個bolt秀存,用于切分每個單詞捶码,同時把單詞發(fā)送出去
* @author bill
* @date 2017年9月16日 下午8:27:45
*/
public static class SplitSentenceBolt extends BaseRichBolt{
private static final long serialVersionUID = -4758047349803579486L;
private OutputCollector collector;
/**
* 當一個Task被初始化的時候會調用此prepare方法,對于bolt來說,第一個方法或链,就是prepare方法
* OutputCollector宙项,這個也是Bolt的這個tuple的發(fā)射器,一般都會在此方法中對發(fā)送Tuple的對象OutputCollector初始化
*/
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
/**
* 這是Bolt中最關鍵的一個方法,對于Tuple的處理都可以放到此方法中進行株扛。具體的發(fā)送也是通過emit方法來完成的
* 就是說,每次接收到一條數(shù)據(jù)后汇荐,就會交給這個executor方法來執(zhí)行
* 切分單詞
*/
public void execute(Tuple input) {
// 接收上游數(shù)據(jù)
String sentence = input.getStringByField("sentence");
String[] words = sentence.split(" ");
for(String word : words){
//發(fā)射數(shù)據(jù)
this.collector.emit(new Values(word));
}
}
/**
* 用于聲明當前bolt的Tuple發(fā)送流的域名字洞就。Stream流的定義是通過OutputFieldsDeclare.declareStream方法完成的
* 通俗點說法:就是這個方法是定義一個你發(fā)射出去的每個tuple中的每個field的名稱是什么,作為下游 bolt 中 execute 接收數(shù)據(jù) key
* 定義發(fā)射出去的tuple掀淘,每個field的名稱
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
- 在 WordCountTopology 中編寫一個 WordCountBolt 靜態(tài)內部類旬蟋,繼承實現(xiàn) BaseRichBolt 抽象類,用于處理上游 Bolt 發(fā)送過來的數(shù)據(jù)革娄,這里做單詞計數(shù)
/**
*
* 單詞次數(shù)統(tǒng)計bolt
* @author bill
* @date 2017年9月16日 下午8:35:00
*/
public static class WordCountBolt extends BaseRichBolt{
private static final Logger LOGGER = LoggerFactory.getLogger(WordCountBolt.class);
private static final long serialVersionUID = -7114915627898482737L;
private OutputCollector collector;
Map<String,Long> countMap = Maps.newConcurrentMap();
/**
* 當一個Task被初始化的時候會調用此prepare方法,對于bolt來說倾贰,第一個方法,就是prepare方法
* OutputCollector拦惋,這個也是Bolt的這個tuple的發(fā)射器,一般都會在此方法中對發(fā)送Tuple的對象OutputCollector初始化
*/
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
/**
* 這是Bolt中最關鍵的一個方法匆浙,對于Tuple的處理都可以放到此方法中進行。具體的發(fā)送也是通過emit方法來完成的
* 就是說厕妖,每次接收到一條數(shù)據(jù)后首尼,就會交給這個executor方法來執(zhí)行
* 統(tǒng)計單詞
*/
public void execute(Tuple input) {
// 接收上游數(shù)據(jù)
String word = input.getStringByField("word");
Long count = countMap.get(word);
if(null == count){
count = 0L;
}
count ++;
countMap.put(word, count);
LOGGER.info(" ★★★ 單詞計數(shù)[{}] 出現(xiàn)的次數(shù):{}", word, count);
//發(fā)射數(shù)據(jù)
this.collector.emit(new Values(word,count));
}
/**
* 用于聲明當前bolt的Tuple發(fā)送流的域名字。Stream流的定義是通過OutputFieldsDeclare.declareStream方法完成的
* 通俗點說法:就是這個方法是定義一個你發(fā)射出去的每個tuple中的每個field的名稱是什么言秸,作為下游 bolt 中 execute 接收數(shù)據(jù) key
* 定義發(fā)射出去的tuple软能,每個field的名稱
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word","count"));
}
}
- 構造拓撲(Topology),提交 storm 集群執(zhí)行(**注: storm 提供了本地模擬集群举畸,可以直接在代碼編輯器中編譯執(zhí)行)
在 WordCountTopology 中編寫main 執(zhí)行方法查排,代碼如下:
public static void main(String[] args) {
//去將spout和bolts組合起來,構建成一個拓撲
TopologyBuilder builder = new TopologyBuilder();
// 第一個參數(shù)的意思抄沮,就是給這個spout設置一個名字
// 第二個參數(shù)的意思跋核,就是創(chuàng)建一個spout的對象
// 第三個參數(shù)的意思,就是設置spout的executor有幾個
builder.setSpout("RandomSentence", new RandomSentenceSpout(), 2);
builder.setBolt("SplitSentence", new SplitSentenceBolt(), 5)
//為bolt 設置 幾個task
.setNumTasks(10)
//設置流分組策略
.shuffleGrouping("RandomSentence");
// fieldsGrouping 這個很重要合是,就是說了罪,相同的單詞,從SplitSentenceSpout發(fā)射出來時聪全,一定會進入到下游的指定的同一個task中
// 只有這樣子泊藕,才能準確的統(tǒng)計出每個單詞的數(shù)量
// 比如你有個單詞,hello,下游task1接收到3個hello娃圆,task2接收到2個hello
// 通過fieldsGrouping 可以將 5個hello玫锋,全都進入一個task
builder.setBolt("wordCount", new WordCountBolt(), 10)
//為bolt 設置 幾個task
.setNumTasks(20)
//設置流分組策略
.fieldsGrouping("SplitSentence", new Fields("word"));
// 運行配置項
Config config = new Config();
//說明是在命令行執(zhí)行,打算提交到storm集群上去
if(args != null && args.length > 0){
/**
* 要想提高storm的并行度可以從三個方面來改造
* worker(進程)>executor(線程)>task(實例)
* 增加work進程讼呢,增加executor線程撩鹿,增加task實例
* 對應 supervisor.slots.port 中配置個數(shù)
* 這里可以動態(tài)設置使用個數(shù)
* 最好一臺機器上的一個topology只使用一個worker,主要原因時減少了worker之間的數(shù)據(jù)傳輸
*
* 注意:如果worker使用完的話再提交topology就不會執(zhí)行,因為沒有可用的worker悦屏,只能處于等待狀態(tài)节沦,把之前運行的topology停止一個之后這個就會繼續(xù)執(zhí)行了
*/
config.setNumWorkers(3);
try {
// 將Topolog提交集群
StormSubmitter.submitTopology(args[0], config, builder.createTopology());
} catch (Exception e) {
e.printStackTrace();
}
}else{
// 說明是在eclipse里面本地運行
// 用本地模式運行1個拓撲時,用來限制生成的線程的數(shù)量
config.setMaxTaskParallelism(20);
// 將Topolog提交本地集群
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("wordCountTopology", config, builder.createTopology());
// 為了測試模擬等待
Utils.sleep(60000);
// 執(zhí)行完畢甫贯,關閉cluster
cluster.shutdown();
}
}
- 運行main 方法,查看效果
運行效果
總結:本次demo看蚜,講解了如何從代碼層面理解storm 的集群架構叫搁、核心概念、并行度供炎、流分組渴逻,結合上一篇文章,同時展現(xiàn)了Spout 到 Bolt音诫,Bolt 到 Bolt 的通信
以上就是本章內容惨奕,如有不對的地方,請多多指教纽竣,謝謝墓贿!
為了方便有需要的人,本系列全部軟件都在 https://pan.baidu.com/s/1qYsJZfY
下章預告:主要講解 storm 集群搭建
代碼地址附上:https://github.com/bill5/cache-project/tree/master/storm-wordcount
作者:逐暗者 (轉載請注明出處)