安裝:
1.下載并解壓縮Zookeeper
官網(wǎng)地址:http://hadoop.apache.org/zookeeper/releases.html
解壓縮步驟此處省略....
2.修改zookeeper的配置文件
把zookeeper對(duì)應(yīng)的conf目錄下的zoo-sample.cfg重命名為zoo.cfg,配置工作目錄和端口號(hào)
# The number of milliseconds of each
ticktickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/usr/local/dev/zookeeper-3.4.9/data
# the port at which the clients will connect
clientPort=2181
server.id_num1=hostname1:2888:3888
server.id_num2=hostname2:2888:3888
注意:集群模式下分別在datadir目錄下創(chuàng)建文件myid壁榕,其中的內(nèi)容為id_num,例如:
echo id_num1 > myid
3.下載并壓縮storm
官網(wǎng)地址:http://storm.apache.org/downloads.html
解壓縮步驟此處省略.....
4.修改storm配置文件
修改conf/storm.yaml,conf/storm.yaml中的配置選項(xiàng)將覆蓋conf/defaults.yaml
1):storm.zookeeper.servers:storm集群中使用的zookeeper集群的地址
storm.zookeeper.servers:
-"host_ip" (此處填寫(xiě)zookeeper集群的主機(jī)名或Ip,多個(gè)用逗號(hào)分隔)
nimbus.host:"host_ip" (此處填下nimbus進(jìn)程的主機(jī),即主節(jié)點(diǎn)ip)
storm.local.dir:"/dest/to/path" (此處填寫(xiě)storm存儲(chǔ)目錄)
supervisor.slots.ports:
- "host_ip:port" (此處填寫(xiě)從節(jié)點(diǎn)的端口號(hào),可隨意矛紫,當(dāng)采用單機(jī)模式的時(shí)候,需要寫(xiě)不同的端口號(hào),具體的個(gè)數(shù)根據(jù)從節(jié)點(diǎn)的個(gè)數(shù)來(lái)定)
5.啟動(dòng)zookeeper
注意:集群中的每臺(tái)機(jī)器都要啟動(dòng),且啟動(dòng)命令一致
進(jìn)入zookeeper的安裝目錄
#bin/zkServer.sh start
6.啟動(dòng)storm nimbus
進(jìn)入storm安裝目錄
#bin/storm nimbus
7.啟動(dòng)storm supervisor
注意:如果是單機(jī)模式,即啟動(dòng)一次即可牌里,如果是集群模式颊咬,需要每臺(tái)都要啟動(dòng),命令一致
進(jìn)入storm安裝目錄
#bin/storm supervisor
8.啟動(dòng)storm ui
進(jìn)入storm安裝目錄
#bin/storm ui
然后訪問(wèn)localhost:8080(或者主節(jié)點(diǎn)主機(jī)名:8080)就會(huì)看到storm的基本信息,到此,storm的安裝部署已經(jīng)成功
接下來(lái)進(jìn)入storm的API牡辽,首先先要了解storm中的頂層接口IComponent
storm中Spout和Bolt都是其Component(部件的意思),所以storm定義了一個(gè)名叫IComponent的接口喳篇,全家普如下:
注意:
綠色部分是我們常用的類(lèi),紅色部分是與事務(wù)有關(guān)的
BaseComponent是Storm提供的"偷懶"的類(lèi),它及其子類(lèi)态辛,或多或少實(shí)現(xiàn)了接口的部分方法麸澜,這樣我們?cè)谑褂玫臅r(shí)候,不用自己每次都寫(xiě)所有的方法,值得一提的是:像BaseXXX的類(lèi),它所實(shí)現(xiàn)的方法奏黑,都是空的炊邦,直接返回null,如果繼承這樣的類(lèi),需要自己重寫(xiě)方法熟史。下面介紹Spout和Bolt組件相關(guān)的Api
Spout
首先看一下總體圖:
從圖中很明顯的看出Spout最頂層抽象的是ISpout接口,簡(jiǎn)單介紹一樣接口中的方法:
open():初始化動(dòng)作,可以在該Spout初始化的時(shí)候做一些動(dòng)作,傳遞上下文等
close():該Spout關(guān)閉之前執(zhí)行,但不能得到保證一定可以執(zhí)行.Spout是作為T(mén)ask運(yùn)行在Worker中的,在Cluster模式下馁害,supervisor會(huì)直接kill -9 worker的進(jìn)程,這樣它就無(wú)法執(zhí)行了.而在本地模式下,如果是發(fā)送停止命令,是可以保證close方法的執(zhí)行的.
activate()和deactivate():一個(gè)Spout可以被暫時(shí)激活和關(guān)閉以故,這兩個(gè)方法可以在對(duì)應(yīng)的時(shí)刻調(diào)用執(zhí)行
nextTuple():用來(lái)發(fā)射數(shù)據(jù),Spout中最核心的部分,一些具體的需求可以在該方法中實(shí)現(xiàn)
ack():一個(gè)Tuple會(huì)有唯一一個(gè)id,當(dāng)該Tuple被成功處理蜗细,會(huì)執(zhí)行該方法
fail():與ack()方法同理,當(dāng)Tuple處理失敗會(huì)調(diào)用該方法
總結(jié):
通常情況下(shell和事務(wù)除外),實(shí)現(xiàn)一個(gè)Spout怒详,可以直接實(shí)現(xiàn)IRichSpout,如果不想寫(xiě)多余的代碼,可以繼承BaseRichSpout
Bolt
同樣炉媒,首先看下總體圖:
可以看出為什么IBasicBolt沒(méi)有繼承IBolt?
我們先看下IBolt的方法:
我們需要知道的是IBolt繼承了java.io.Serializable,我們?cè)趎imbus上提交了Topology后,創(chuàng)建出來(lái)的Bolt會(huì)序列化發(fā)送到具體執(zhí)行的Worker上,Worker在執(zhí)行該Bolt時(shí)昆烁,會(huì)首先調(diào)用prepare方法傳入當(dāng)前執(zhí)行的上下文
execute(Tuple):接收一個(gè)Tuple進(jìn)行處理,并用prepare方法傳入的OutputCollector的ack方法(表示成功)或fail方法(表示失敗)來(lái)反饋結(jié)果
cleanup():同Ispout的close方法吊骤,不能保證其一定被執(zhí)行
好了,現(xiàn)在可以回答為什么IBasicBolt沒(méi)有繼承IBolt這個(gè)問(wèn)題了静尼,Storm提供了IBasicBolt接口白粉,其目的就是實(shí)現(xiàn)該接口的Bolt不用在代碼中反饋結(jié)果了,storm內(nèi)部會(huì)自動(dòng)反饋結(jié)果
總結(jié):
通常情況下實(shí)現(xiàn)一個(gè)Bolt,可以實(shí)現(xiàn)IRichBolt接口或繼承BaseRichBolt,如果不想自己處理反饋結(jié)果,可以實(shí)現(xiàn)IBasicBolt接口或繼承BaseBasicBolt,它實(shí)際上是自己做掉了prepare方法和collector.emit.ack(inputTuple)方法.
OK,介紹完了簡(jiǎn)單的方法,下面寫(xiě)一個(gè)簡(jiǎn)單的Demo,加深一下對(duì)Spout和Bolt的理解
簡(jiǎn)單需求:對(duì)名稱(chēng)加后綴并轉(zhuǎn)換成大寫(xiě)
RandomWordSpout.java
import java.util.Map;
import java.util.Random;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
public class RandomWordSpout extends BaseRichSpout{
private SpoutOutputCollector collector;
//模擬一些數(shù)據(jù)
String[] str = {"hello","word","you","how","are"};
//初始化方法鼠渺,在spout組件實(shí)例化時(shí)調(diào)用一次
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
@Override
public void nextTuple{
//隨機(jī)挑選出一個(gè)名稱(chēng)
Random random = new Random();
int index = random.nextInt(str.length);
//獲取名稱(chēng)
String name = str[index];
//將名稱(chēng)進(jìn)行封裝成tuple鸭巴,發(fā)送消息給下一個(gè)組件
collector.emit(new Vaules(name));
}
//聲明本spout組件發(fā)送出去的tuple中的數(shù)據(jù)的字段名
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("orignname"));
}
}
UpperBolt.java
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class UpperBolt extends BaseBasicBolt{
//業(yè)務(wù)處理邏輯
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
//先獲取到上一個(gè)組件傳遞過(guò)來(lái)的數(shù)據(jù),數(shù)據(jù)在tuple里面
String godName = tuple.getString(0);
//將名稱(chēng)轉(zhuǎn)換成大寫(xiě)
String godName_upper = godName.toUpperCase();
//將轉(zhuǎn)換完成的商品名發(fā)送出去
collector.emit(new Values(godName_upper));
}
//聲明該bolt組件要發(fā)出去的tuple的字段
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("uppername"));
}
}
SuffixBolt.java
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
public class SuffixBolt extends BaseBasicBolt{
FileWriter fileWriter = null;
//在bolt組件運(yùn)行過(guò)程中只會(huì)被調(diào)用一次
@Override
public void prepare(Map stormConf, TopologyContext context) {
try {
fileWriter = new FileWriter("/home/hadoop/stormoutput/"+UUID.randomUUID());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
//該bolt組件的核心處理邏輯
//每收到一個(gè)tuple消息,就會(huì)被調(diào)用一次
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
//先拿到上一個(gè)組件發(fā)送過(guò)來(lái)的名稱(chēng)
String upper_name = tuple.getString(0);
String suffix_name = upper_name + "_itisok";
//為上一個(gè)組件發(fā)送過(guò)來(lái)的商品名稱(chēng)添加后綴
try {
fileWriter.write(suffix_name);
fileWriter.write("\n");
fileWriter.flush();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
TopoMain.java
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;
/**
* 組織各個(gè)處理組件形成一個(gè)完整的處理流程拦盹,就是所謂的topology(類(lèi)似于mapreduce程序中的job)
* 并且將該topology提交給storm集群去運(yùn)行鹃祖,topology提交到集群后就將永無(wú)休止地運(yùn)行,除非人為或者異常退出
*/
public class TopoMain {
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
//將我們的spout組件設(shè)置到topology中去
//parallelism_hint :4 表示用4個(gè)excutor來(lái)執(zhí)行這個(gè)組件
//setNumTasks(8) 設(shè)置的是該組件執(zhí)行時(shí)的并發(fā)task數(shù)量普舆,也就意味著1個(gè)excutor會(huì)運(yùn)行2個(gè)task
builder.setSpout("randomspout", new RandomWordSpout(), 4).setNumTasks(8);
//將大寫(xiě)轉(zhuǎn)換bolt組件設(shè)置到topology恬口,并且指定它接收randomspout組件的消息
//.shuffleGrouping("randomspout")包含兩層含義:
//1校读、upperbolt組件接收的tuple消息一定來(lái)自于randomspout組件
//2、randomspout組件和upperbolt組件的大量并發(fā)task實(shí)例之間收發(fā)消息時(shí)采用的分組策略是隨機(jī)分組shuffleGrouping
builder.setBolt("upperbolt", new UpperBolt(), 4).shuffleGrouping("randomspout");
//將添加后綴的bolt組件設(shè)置到topology祖能,并且指定它接收upperbolt組件的消息
builder.setBolt("suffixbolt", new SuffixBolt(), 4).shuffleGrouping("upperbolt");
//用builder來(lái)創(chuàng)建一個(gè)topology
StormTopology demotop = builder.createTopology();
//配置一些topology在集群中運(yùn)行時(shí)的參數(shù)
Config conf = new Config();
//這里設(shè)置的是整個(gè)demotop所占用的槽位數(shù)歉秫,也就是worker的數(shù)量
conf.setNumWorkers(4);
conf.setDebug(true);
conf.setNumAckers(0);
//將這個(gè)topology提交給storm集群運(yùn)行
StormSubmitter.submitTopology("demotopo", conf, demotop);
}
}
最后將工程打包,在集群上運(yùn)行
#storm jar jar_name.jar class_name args0 ....