1.api介紹
生成Topology
Map conf = new HashMp();
//topology所有自定義的配置均放入這個Map
TopologyBuilder builder = new TopologyBuilder();
//創(chuàng)建topology的生成器
int spoutParal = get("spout.parallel", 1);
//獲取spout的并發(fā)設(shè)置
SpoutDeclarer spout = builder.setSpout(SequenceTopologyDef.SEQUENCE_SPOUT_NAME,
new SequenceSpout(), spoutParal);
//創(chuàng)建Spout, 其中new SequenceSpout() 為真正spout對象,SequenceTopologyDef.SEQUENCE_SPOUT_NAME 為spout的名字,注意名字中不要含有空格
int boltParal = get("bolt.parallel", 1);
//獲取bolt的并發(fā)設(shè)置
BoltDeclarer totalBolt = builder.setBolt(SequenceTopologyDef.TOTAL_BOLT_NAME, new TotalCount(),
boltParal).shuffleGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
//創(chuàng)建bolt寂拆, SequenceTopologyDef.TOTAL_BOLT_NAME 為bolt名字浊伙,TotalCount 為bolt對象,boltParal為bolt并發(fā)數(shù)晌缘,
//shuffleGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME),
//表示接收SequenceTopologyDef.SEQUENCE_SPOUT_NAME的數(shù)據(jù)痢站,并且以shuffle方式磷箕,
//即每個spout隨機輪詢發(fā)送tuple到下一級bolt中
int ackerParal = get("acker.parallel", 1);
Config.setNumAckers(conf, ackerParal);
//設(shè)置表示acker的并發(fā)數(shù)
int workerNum = get("worker.num", 10);
conf.put(Config.TOPOLOGY_WORKERS, workerNum);
//表示整個topology將使用幾個worker
conf.put(Config.STORM_CLUSTER_MODE, "distributed");
//設(shè)置topolog模式為分布式,這樣topology就可以放到JStorm集群上運行
StormSubmitter.submitTopology(streamName, conf,
builder.createTopology());
//提交topology
IRichSpout
IRichSpout 為最簡單的Spout接口
IRichSpout{
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
}
@Override
public void close() {
}
@Override
public void activate() {
}
@Override
public void deactivate() {
}
@Override
public void nextTuple() {
}
@Override
public void ack(Object msgId) {
}
@Override
public void fail(Object msgId) {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
其中注意:
=>spout對象必須是繼承Serializable阵难, 因此要求spout內(nèi)所有數(shù)據(jù)結(jié)構(gòu)必須是可序列化的
=>spout可以有構(gòu)造函數(shù)岳枷,但構(gòu)造函數(shù)只執(zhí)行一次,是在提交任務(wù)時,創(chuàng)建spout對象空繁,因此在task分配到具體worker之前的初始化工作可以在此處完成殿衰,一旦完成,初始化的內(nèi)容將攜帶到每一個=>task內(nèi)(因為提交任務(wù)時將spout序列化到文件中去盛泡,在worker起來時再將spout從文件中反序列化出來)闷祥。
=>open是當(dāng)task起來后執(zhí)行的初始化動作
=>close是當(dāng)task被shutdown后執(zhí)行的動作
=>activate 是當(dāng)task被激活時,觸發(fā)的動作
=>deactivate 是task被deactive時傲诵,觸發(fā)的動作
=>nextTuple 是spout實現(xiàn)核心凯砍, nextuple完成自己的邏輯,即每一次取消息后拴竹,用collector 將消息emit出去悟衩。
=>ack, 當(dāng)spout收到一條ack消息時栓拜,觸發(fā)的動作座泳,詳情可以參考 ack機制
=>fail, 當(dāng)spout收到一條fail消息時幕与,觸發(fā)的動作钳榨,詳情可以參考 ack機制
=>declareOutputFields, 定義spout發(fā)送數(shù)據(jù)纽门,每個字段的含義
=>getComponentConfiguration 獲取本spout的component 配置
Bolt
IRichBolt {
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
}
@Override
public void execute(Tuple input) {
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
其中注意:
=>bolt對象必須是繼承Serializable薛耻, 因此要求spout內(nèi)所有數(shù)據(jù)結(jié)構(gòu)必須是可序列化的
=>bolt可以有構(gòu)造函數(shù),但構(gòu)造函數(shù)只執(zhí)行一次赏陵,是在提交任務(wù)時饼齿,創(chuàng)建bolt對象,因此在task分配到具體worker之前的初始化工作可以在此處完成蝙搔,一旦完成缕溉,初始化的內(nèi)容將攜帶到每一個task內(nèi)(因為提交任務(wù)時將bolt序列化到文件中去,在worker起來時再將bolt從文件中反序列化出來)吃型。
=>prepare是當(dāng)task起來后執(zhí)行的初始化動作
=>cleanup是當(dāng)task被shutdown后執(zhí)行的動作
=>execute是bolt實現(xiàn)核心证鸥, 完成自己的邏輯,即接受每一次取消息后勤晚,處理完枉层,有可能用collector 將產(chǎn)生的新消息emit出去。 ** 在executor中赐写,當(dāng)程序處理一條消息時鸟蜡,需要執(zhí)行collector.ack, 詳情可以參考 ack機制 ** 在executor中挺邀,當(dāng)程序無法處理一條消息時或出錯時贷笛,需要執(zhí)行collector.fail ,詳情可以參考 ack機制
=>declareOutputFields珍德, 定義bolt發(fā)送數(shù)據(jù),每個字段的含義
=>getComponentConfiguration 獲取本bolt的component 配置
<dependency>
<groupId>com.alibaba.jstorm</groupId>
<artifactId>jstorm-client</artifactId>
<version>0.9.3.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.jstorm</groupId>
<artifactId>jstorm-client-extension</artifactId>
<version>0.9.3.1</version>
<scope>provided</scope>
</dependency>
打包
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>storm.starter.SequenceTopology</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
</plugins>
</build>
提交jar
xxxx.jar 為打包后的jar
com.alibaba.xxxx.xx 為入口類疲眷,即提交任務(wù)的類
parameter即為提交參數(shù)
jstorm jar xxxxxx.jar com.alibaba.xxxx.xx parameter
2.Ack原理
Storm中有個特殊的task名叫acker,他們負(fù)責(zé)跟蹤spout發(fā)出的每一個Tuple的Tuple樹(因為一個tuple通過spout發(fā)出了您朽,經(jīng)過每一個bolt處理后咪橙,會生成一個新的tuple發(fā)送出去)。當(dāng)acker(框架自啟動的task)發(fā)現(xiàn)一個Tuple樹已經(jīng)處理完成了虚倒,它會發(fā)送一個消息給產(chǎn)生這個Tuple的那個task美侦。Acker的跟蹤算法是Storm的主要突破之一,對任意大的一個Tuple樹魂奥,它只需要恒定的20字節(jié)就可以進行跟蹤菠剩。
Acker跟蹤算法的原理:acker對于每個spout-tuple保存一個ack-val的校驗值,它的初始值是0耻煤,然后每發(fā)射一個Tuple或Ack一個Tuple時具壮,這個Tuple的id就要跟這個校驗值異或一下,并且把得到的值更新為ack-val的新值哈蝇。那么假設(shè)每個發(fā)射出去的Tuple都被ack了棺妓,那么最后ack-val的值就一定是0。Acker就根據(jù)ack-val是否為0來判斷是否完全處理炮赦,如果為0則認(rèn)為已完全處理怜跑。
要實現(xiàn)ack機制:
1,spout發(fā)射tuple的時候指定messageId
2吠勘,spout要重寫B(tài)aseRichSpout的fail和ack方法
3性芬,spout對發(fā)射的tuple進行緩存(否則spout的fail方法收到acker發(fā)來的messsageId,spout也無法獲取到發(fā)送失敗的數(shù)據(jù)進行重發(fā))剧防,看看系統(tǒng)提供的接口植锉,只有msgId這個參數(shù),這里的設(shè)計不合理峭拘,其實在系統(tǒng)里是有cache整個msg的俊庇,只給用戶一個messageid,用戶如何取得原來的msg貌似需要自己cache鸡挠,然后用這個msgId去查詢辉饱,太坑爹了3,spout根據(jù)messageId對于ack的tuple則從緩存隊列中刪除宵凌,對于fail的tuple可以選擇重發(fā)鞋囊。
4,設(shè)置acker數(shù)至少大于0;Config.setNumAckers(conf, ackerParal);
阿里自己的Jstorm會提供
public interface IFailValueSpout { void fail(Object msgId, List<object>values); }
這樣更合理一些, 可以直接取得系統(tǒng)cache的msg values
ack機制即瞎惫,spout發(fā)送的每一條消息溜腐,在規(guī)定的時間內(nèi),spout收到Acker的ack響應(yīng)瓜喇,即認(rèn)為該tuple 被后續(xù)bolt成功處理
在規(guī)定的時間內(nèi)(默認(rèn)是30秒)挺益,沒有收到Acker的ack響應(yīng)tuple,就觸發(fā)fail動作乘寒,即認(rèn)為該tuple處理失敗望众,timeout時間可以通過Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS來設(shè)定。
l或者收到Acker發(fā)送的fail響應(yīng)tuple伞辛,也認(rèn)為失敗烂翰,觸發(fā)fail動作
注意,我開始以為如果繼承BaseBasicBolt那么程序拋出異常蚤氏,也會讓spout進行重發(fā)甘耿,但是我錯了,程序直接異常停止了
這里我以分布式程序入門案例worldcount為例子吧竿滨。
問題:
有沒有想過佳恬,如果該tuple的眾多子tuple中,某一個子tuple處理
failed了于游,但是另外的子tuple仍然會繼續(xù)執(zhí)行毁葱,如果子tuple都是執(zhí)
行數(shù)據(jù)存儲操作,那么就算整個消息失敗贰剥,那些生成的子tuple還
是會成功執(zhí)行而不會回滾的倾剿。
(1)關(guān)于Storm如何處理重復(fù)的tuple問題
有人問到Storm 是怎么處理重復(fù)的tuple?
因為Storm 要保證tuple 的可靠處理蚌成,當(dāng)tuple 處理失敗或者超時的時候柱告,spout 會fail并重新發(fā)送該tuple,那么就會有tuple 重復(fù)計算的問題笑陈。這個問題是很難解決的际度,storm也沒有提供機制幫助你解決。不過也有一些可行的策略:
(1)不處理涵妥,這也算是種策略乖菱。因為實時計算通常并不要求很高的精確度,后
續(xù)的批處理計算會更正實時計算的誤差蓬网。
(2)使用第三方集中存儲來過濾窒所,比如利用MySQL、MemCached 或者Redis 根據(jù)邏輯主鍵來去重帆锋。
(3)使用bloom filter 做過濾吵取,簡單高效。
(2)關(guān)于Storm的ack和fail問題
在學(xué)習(xí)storm的過程中锯厢,有不少人對storm的Spout組件中的ack及fail相關(guān)的問題存在困惑皮官,這里做一個簡要的概述脯倒。
Storm保證每一個數(shù)據(jù)都得到有效處理,這是如何保證的呢捺氢?正是ack及fail機制確保數(shù)據(jù)都得到處理的保證藻丢,但是storm只是提供給我們一個接口,而具體的方法得由我們自己來實現(xiàn)摄乒。例如在spout下一個拓?fù)涔?jié)點的bolt上悠反,我們定義某種情況下為數(shù)據(jù)處理失敗,則調(diào)用fail馍佑,則我們可以在fail方法中進行數(shù)據(jù)重發(fā)斋否,這樣就保證了數(shù)據(jù)都得到了處理。其實拭荤,通過讀storm的源碼茵臭,里面有講到,有些類(BaseBasicBolt穷劈?)是會自動調(diào)用ack和fail的笼恰,不需要我們程序員去ack和fail,但是其他Bolt就沒有這種功能了歇终。