本文將學習如何使用java創(chuàng)建Storm拓撲
Storm集群的組件
Storm集群類似于Hadoop集群绝页,只不過 Hadoop 上運行"MapReduce jobs"迄汛, Storm 上運行"topologies"。
兩者最大的差別是槽畔,MapReducejobs 最終是完成的栈妆,而 topologies 是一直處理消息(或直到你殺死它)。
集群 | 任務名稱 | 任務時效性 |
---|---|---|
Storm | topologies(拓撲) | 一直處理消息(或直到你殺死它) |
Hadoop | MapReduce jobs | 最終是完成的 |
Storm集群上有兩種節(jié)點:master 和 worker 節(jié)點
- master:
運行一個名為 Nimbus 的守護進程,
負責在集群周圍分發(fā)代碼厢钧,
為機器分配任務以及監(jiān)控故障鳞尔。
(類似 Hadoop 的 JobTracker) - worker:
運行一個名為 Supervisor 的守護進程,
負責監(jiān)聽、并根據(jù)需要啟動早直、停止 "Nimbus" 分配給其的任務铅檩。
每個工作進程都執(zhí)行拓撲的子集。 運行拓撲由分布在許多計算機上的許多工作進程組成莽鸿。
Nimbus 和 Supervisors 之間的協(xié)調(diào)是通過 Zookeeper 實現(xiàn)的昧旨。
此外,Nimbus 守護程序和 Supervisors 守護程序是 fail-fast 和 stateless;
所有狀態(tài)都保存在Zookeeper或本地磁盤上祥得。這意味著你可以通過 kill -9
殺死 Nimbus 或者 Supervisors 兔沃,但是它們會像沒事一樣重新開始。
這種設計使Storm集群非常穩(wěn)定级及。
Topologies
要想在 Storm 上進行實時計算乒疏,你需要創(chuàng)建一個 topologies 。
topologies 是一個計算圖饮焦,topologies中的每個節(jié)點包含計算邏輯怕吴,并且通過節(jié)點之間的連接定義了數(shù)據(jù)在節(jié)點之間的流動方向。
運行拓撲很簡單县踢。首先转绷,將所有代碼和依賴項打包到一個jar中。然后硼啤,運行如下命令:
storm jar all-my-code.jar org.apache.storm.MyTopology arg1 arg2
額 這個命令沒啥好解釋的....
Streams
Stream 是一個無限的元組序列议经,是 Storm 抽象的核心。
Storm 提供了以分布式和可靠的方式進行 Stream 傳換的原語谴返。
比如將微博 Stream 轉(zhuǎn)換為轉(zhuǎn)換熱門主題 Stream煞肾。
Storm 為進行 Stream 轉(zhuǎn)換提供 spouts 和 bolt 兩個基本原語。
- spouts
spouts 是 Stream 的來源嗓袱。
例如籍救,spout可以讀取Kestrel隊列中的元組并將其作為 Stream 發(fā)出∏ǎ或者 spout 可以連接到Twitter API并發(fā)出推文流蝙昙。 - bolt
bolt會消耗任意數(shù)量的輸入流闪萄,進行一些處理,并可能發(fā)出新的流耸黑。
像從推文流計算趨勢主題流之類的復雜流轉(zhuǎn)換桃煎,需要多個步驟篮幢,因此需要多個 bolt 大刊。
bolt 可以執(zhí)行任何操作,包括運行函數(shù)三椿,過濾元組缺菌,進行流聚合,進行流連接搜锰,與數(shù)據(jù)庫對話等等伴郁。
spout 和 bolt 網(wǎng)絡被打包成一個 topology ,這是提交給 Storm 集群執(zhí)行的頂級抽象蛋叼。
拓撲是流轉(zhuǎn)換的圖形焊傅,其中每個節(jié)點都是一個 spout 或 bolt 。
圖中的表示哪些 bolt 訂閱了哪些流狈涮。
當一個 spout 或 bolt 向一個流發(fā)出一個元組時狐胎,它會將元組發(fā)送給訂閱該流的每個 bolt 。
拓撲中節(jié)點之間的鏈接指示應如何傳遞元組歌馍。
如上圖握巢,Spout A 和Bolt B 之間有鏈接,Spout A 到 Bolt C 之間有鏈接松却,以及從 Bolt B 到 Bolt C 之間有鏈接暴浦。
那么每次 Spout A 發(fā)出一個元組時,它都會將元組發(fā)送給 Bolt B 和 Bolt C .所有 Bolt B 的輸出元組也將送給 Bolt C.
Storm拓撲中的每個節(jié)點并行執(zhí)行晓锻。
在拓撲中歌焦,你可以為每個節(jié)點指定所需的并行度,Storm將在集群中生成該數(shù)量的線程以執(zhí)行砚哆。
拓撲會一直執(zhí)行(或直到你殺死它)同规。
Storm會自動重新分配失敗的任務。
此外窟社,Storm保證不會丟失數(shù)據(jù)券勺,即使計算機出現(xiàn)故障并且消息丟失也是如此。
Data model
Storm使用元組作為其數(shù)據(jù)模型灿里。
元組是一個命名的值列表关炼,元組中的字段可以是任何類型的對象。
Storm支持所有原始類型匣吊,字符串和字節(jié)數(shù)組作為元組字段值儒拂。
要使用其他類型的對象寸潦,需要為該類型實現(xiàn)一個序列化程序。
拓撲中的每個節(jié)點都必須聲明它發(fā)出的元組的輸出字段社痛。
例如下面代碼中的bolt 聲明它發(fā)出2元組见转,字段為 "double" 和 "triple"
package com.aaa.test;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Map;
/**
* @author lillcol
* 2019/7/18-11:46
*/
public class DoubleAndTripleBolt extends BaseRichBolt {
private OutputCollector _collector;
@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
@Override
public void execute(Tuple input) {
int val = input.getInteger(0);
_collector.emit(input, new Values(val*2, val*3));
_collector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("double", "triple"));//聲明["double", "triple"]組件的輸出字段
}
}
一個簡單的拓撲(A simple topology)
如何實現(xiàn)一個簡單的拓撲?
本地 idea測試
sbt構(gòu)建
// libraryDependencies += "org.apache.storm" % "storm-core" % "2.0.0" % "provided"
libraryDependencies += "org.apache.storm" % "storm-core" % "2.0.0"
定義一個Spout蒜哀,此處采用隨機數(shù)
package com.test.storm;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.Map;
import java.util.Random;
/**
* @author lillcol
* 2019/7/18-12:03
*/
public class TestWordSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
@Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
@Override
public void nextTuple() {
//Spouts負責向拓撲中發(fā)送新消息
Utils.sleep(100);
//每隔100ms就會從列表中隨機選一個單詞發(fā)出
final String[] words = new String[]{"hellow", "lillcol", "study", "storm"};
final Random rand = new Random();
final String word = words[rand.nextInt(words.length)];
collector.emit(new Values(word));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
定義Bolt斩箫,功能接收到的信息追加"levelUp!"
package com.test.storm;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Map;
/**
* @author lillcol
* 2019/7/18-12:04
*/
public class ExclamationBolt extends BaseRichBolt {
OutputCollector collector;
//prepare方法為 Bolt 提供了一個OutputCollector用于從 Bolt 中發(fā)出元組 。
//元組可以隨時的從prepare撵儿,execute乘客,cleanup,甚至在另一個線程中異步發(fā)出淀歇。
//當前prepare實現(xiàn)只是將OutputCollector作為實例變量保存易核,以便稍后在execute方法中使用。
@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
this.collector =collector;
}
//execute方法從一個Bolt的輸入接收一個元組浪默。
//此execute取數(shù)組的第一個字段并發(fā)出追加字符串“l(fā)evelUp!” 得字符串牡直。
//如果您實現(xiàn)了一個訂閱多個輸入源的bolt,您可以通過使用Tuple#getSourceComponent方法找出Tuple來自哪個組件纳决。
@Override
public void execute(Tuple input) {
String sourceComponent = input.getSourceComponent();
//輸入元組作為第一個參數(shù)傳遞emit
collector.emit(input, new Values(input.getString(0) + "levelUp!"));
System.out.println(input.getString(0));
// 輸入元組在最后一行被激活碰逸。這些是Storm的可靠性API的一部分,用于保證不會丟失數(shù)據(jù)
collector.ack(input);
}
//declareOutputFields方法聲明ExclamationBolt發(fā)出1元組岳链,其中一個字段稱為“word”花竞。
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
//如果implements IRichBol
//還需要重寫下面兩個方法
//當Bolt被關(guān)閉時調(diào)用cleanup方法,并且應該清除所有打開的資源掸哑。
//無法保證在集群上調(diào)用此方法:例如约急,如果任務正在運行的計算機爆炸,則無法調(diào)用該方法苗分。
@Override
public void cleanup() {
}
//getComponentConfiguration方法允許配置此組件運行方式的各個方面
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
//但是一般情況下我們不需要這兩個方法厌蔽,所以我們可以通過繼承BaseRichBolt來定義Bolt
定義調(diào)用類
package com.test.storm;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.shade.org.apache.jute.Utils;
import org.apache.storm.topology.TopologyBuilder;
/**
* @author lillcol
* 2019/7/18-12:03
*/
public class SimpleTopology {
public static void main(String[] args) throws Exception {
SimpleTopology topology = new SimpleTopology();
topology.runLocal(60);
}
public void runLocal(int waitSeconds) throws Exception {
TopologyBuilder topologyBuilder = new TopologyBuilder();
//第一個參數(shù)是給Spout一個id "words",
//第二個參數(shù)是要調(diào)用的Spout類
//第三個參數(shù)是節(jié)點所需的并行度摔癣,是可選的奴饮。它指示應在群集中執(zhí)行該組件的線程數(shù)。如果省略它择浊,Storm將只為該節(jié)點分配一個線程戴卜。
topologyBuilder.setSpout("words", new TestWordSpout(), 1);
//Bolt的參數(shù)與Spout
//只是要通過shuffleGrouping 指定數(shù)據(jù)來源"words")
//“shuffle grouping”意味著元組應該從輸入任務隨機分配到bolt的任務中。
topologyBuilder.setBolt("DoubleAndTripleBolt1", new ExclamationBolt(), 1)
.shuffleGrouping("words");
//一個Bolt可以接收多個數(shù)據(jù)來源琢岩,是要多次調(diào)用shuffleGrouping即可
topologyBuilder.setBolt("DoubleAndTripleBolt2", new ExclamationBolt(), 1)
.shuffleGrouping("DoubleAndTripleBolt1")
.shuffleGrouping("words");
//loacl 測試
Config config = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word_count", config, topologyBuilder.createTopology());
org.apache.storm.utils.Utils.sleep(1000*10);
cluster.killTopology("word_count");
cluster.shutdown();
}
}
運行結(jié)果:
study
study
studylevelUp!
study
study
studylevelUp!
hellow
hellow
hellowlevelUp!
lillcol
lillcol
lillcollevelUp!
hellow
hellow
hellowlevelUp!
hellow
hellow
hellowlevelUp!
lillcol
lillcol
lillcollevelUp!
. . .
異常
可能出現(xiàn)異常1:
java.lang.NoClassDefFoundError: org/apache/storm/topology/IRichSpout
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
at java.lang.Class.getMethod0(Class.java:3018)
at java.lang.Class.getMethod(Class.java:1784)
at sun.launcher.LauncherHelper.validateMainClass(LauncherHelper.java:544)
at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:526)
Caused by: java.lang.ClassNotFoundException: org.apache.storm.topology.IRichSpout
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 7 more
Error: A JNI error has occurred, please check your installation and try again
這個是因為在sbt構(gòu)建的時候 % "provided" 意思是已提供相關(guān)jar投剥,但是我們idea測試的時候并沒有相關(guān)jar
libraryDependencies += "org.apache.storm" % "storm-core" % "2.0.0" % "provided"
所以不能用上面的語句,改成下面的即可
libraryDependencies += "org.apache.storm" % "storm-core" % "2.0.0"
maven 對應著改就可以了
可能出現(xiàn)異常2:
Exception in thread "main" java.lang.ExceptionInInitializerError
at org.apache.log4j.LogManager.getLogger(LogManager.java:44)
at org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:64)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:358)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:383)
at org.apache.storm.LocalCluster.<clinit>(LocalCluster.java:128)
at com.test.storm.SimpleTopology.runLocal(SimpleTopology.java:28)
at com.test.storm.SimpleTopology.main(SimpleTopology.java:16)
Caused by: java.lang.IllegalStateException: Detected both log4j-over-slf4j.jar AND slf4j-log4j12.jar on the class path, preempting StackOverflowError. See also http://www.slf4j.org/codes.html#log4jDelegationLoop for more details.
at org.apache.log4j.Log4jLoggerFactory.<clinit>(Log4jLoggerFactory.java:49)
... 7 more
錯誤報的很明顯
log4j-over-slf4j.jar AND slf4j-log4j12.jar 沖突了
我的解決辦法是在測試的時候隨便刪掉一個担孔,但是生產(chǎn)的時候在可能沖突的依賴中把它去掉
Storm 的 的hellow word(word count)
//定義Spout WordReader
package com.test.storm;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Map;
/**
* @author lillcol
* 2019/7/19-9:17
*/
public class WordReader extends BaseRichSpout {
private SpoutOutputCollector collector;
private FileReader fileReader;
private boolean completed = false;
/**
* open方法江锨,接收三個參數(shù):
* 第一個是創(chuàng)建Topology的配置吃警,
* 第二個是所有的Topology數(shù)據(jù)
* 第三個是用來把Spout的數(shù)據(jù)發(fā)射給bolt
**/
@Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
try {
//獲取創(chuàng)建Topology時指定的要讀取的文件路徑
this.fileReader = new FileReader(conf.get("wordsFile").toString());
} catch (FileNotFoundException e) {
throw new RuntimeException("Error reading file ["
+ conf.get("wordFile") + "]");
}
//初始化發(fā)射器
this.collector = collector;
}
/**
* nextTuple是Spout最主要的方法:
* 在這里我們讀取文本文件,并把它的每一行發(fā)射出去(給bolt)
* 這個方法會不斷被調(diào)用啄育,為了降低它對CPU的消耗酌心,當任務完成時讓它sleep一下
**/
@Override
public void nextTuple() {
//如果要看到tail效果,去掉這個 if (completed) 語句塊挑豌,我測試的時候不去掉看不到效果只有報錯
if (completed) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return;
}
String str;
BufferedReader bufferedReader = new BufferedReader(fileReader);
try {
while ((str = bufferedReader.readLine()) != null) {
//發(fā)送一行
collector.emit(new Values(str), str);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
completed = true;
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("line"));
}
}
//定義Bolt WordSplit 實現(xiàn)切割
package com.test.storm;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* @author lillcol
* 2019/7/19-9:38
*/
public class WordSplit implements IRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
/**
* execute是bolt中最重要的方法:
* 當接收到一個tuple時安券,此方法被調(diào)用
* 這個方法的作用就是把接收到的每一行切分成單個單詞,并把單詞發(fā)送出去(給下一個bolt處理)
**/
@Override
public void execute(Tuple input) {
String line = input.getString(0);
String[] words = line.split(",| |\\|");
for (String word : words) {
word = word.trim();
if (!word.isEmpty()) {
List a = new ArrayList();
a.add(input);
collector.emit(a, new Values(word));
}
}
collector.ack(input);
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
//定義Bolt WordCounter 實現(xiàn)統(tǒng)計
package com.test.storm;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
import java.util.HashMap;
import java.util.Map;
/**
* @author lillcol
* 2019/7/19-10:01
*/
public class WordCounter implements IRichBolt {
Integer id;
String name;
Map<String, Integer> counters;
private OutputCollector collector;
@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
this.counters = new HashMap<String, Integer>();
this.collector = collector;
this.name = context.getThisComponentId();
this.id = context.getThisTaskId();
}
@Override
public void execute(Tuple input) {
String str = input.getString(0);
if (!counters.containsKey(str)) {
counters.put(str, 1);
} else {
counters.put(str, counters.get(str) + 1);
}
//如果要看到tail效果浮毯,應該在這里打印統(tǒng)計信息
// System.out.println(str+":"+counters.get(str) );
collector.ack(input);
}
//這里只是最后一次打印完疫,要tail效果不應該在這里打印統(tǒng)計信息泰鸡。
@Override
public void cleanup() {
System.out.println("--Word Counter [" + name + "-" + id + "] --");
for (Map.Entry<String, Integer> entry : counters.entrySet()) {
System.out.println(entry.getKey() + ":" + entry.getValue());
}
counters.clear();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
//定義主類
package com.test.storm;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
/**
* @author lillcol
* 2019/7/19-10:33
*/
public class WordCountTopology {
public static void main(String[] args) throws Exception {
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("wordReader",new WordReader(),1);
topologyBuilder.setBolt("WordSplit",new WordSplit(),1)
.shuffleGrouping("wordReader");
topologyBuilder.setBolt("",new WordCounter(),2)
.shuffleGrouping("WordSplit");
//配置
Config config = new Config();
config.put("wordsFile","D:\\stromFile");
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
//創(chuàng)建一個本地模式cluster
LocalCluster localCluster = new LocalCluster();
//提交Topology
localCluster.submitTopology("WordCountTopology",config,topologyBuilder.createTopology());
Thread.sleep(2000);//這個時間要控制好债蓝,太短看不到效果
localCluster.shutdown();
}
}
//輸出結(jié)果
11:35:16.845 [SLOT_1024] INFO o.a.s.e.ExecutorShutdown - Shutting down executor :[2, 2]
11:35:16.845 [Thread-37--executor[2, 2]] INFO o.a.s.u.Utils - Async loop interrupted!
--Word Counter [-2] --
Thread[SLOT_1027:73
40673ms:1
11:34:31.865:1
30724ms:1
11:34:23.065:1
11:34:27.365:1
. . .
11:35:16.846 [SLOT_1024] INFO o.a.s.e.ExecutorShutdown - Shut down executor :[2, 2]
11:35:16.846 [SLOT_1024] INFO o.a.s.e.ExecutorShutdown - Shutting down executor :[1, 1]
11:35:16.846 [Thread-38--executor[1, 1]] INFO o.a.s.u.Utils - Async loop interrupted!
--Word Counter [-1] --
Thread[SLOT_1027:87
11:34:31.465:1
29524ms:1
26024ms:1
. . .