Storm中的事務控制是門藝術举农,其中ack機制是精髓虑凛,可以參考Storm源碼分析 一書蔫浆,其中有精彩的分析殖属。
在storm開發(fā)過程中,相信一直有一個困擾很久的問題:function執(zhí)行失敗克懊,拋出了某個異常對象而導致topology終止運行忱辅。這時我們需要根據(jù)業(yè)務情況從下面2種選擇中作出判斷七蜘。
1.記錄本次異常谭溉,整個topology繼續(xù)運行墙懂。
2.通知spout重發(fā)數(shù)據(jù)。由于Trident中數(shù)據(jù)流的最小單位為batch,所以重發(fā)數(shù)據(jù)意味重發(fā)失敗的整個batch扮念。
第一種情況很好處理
public class WordFunction extends BaseFunction {
/**
*
*/
private static final long serialVersionUID = 735468688795780833L;
/**
* 接收數(shù)據(jù)流
* 每次接收batch中一條數(shù)據(jù)
*/
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
try{
// 正常業(yè)務
}
catch (Exception e) {
// 記錄異常日志
}
}
private Logger logger = LoggerFactory.getLogger(getClass());
}
通過try...catch捕獲異常损搬,并進行相應處理。保證topology繼續(xù)運行柜与。
第二種情況處理也很簡單
public class WordFunction extends BaseFunction {
/**
*
*/
private static final long serialVersionUID = 735468688795780833L;
/**
* 接收數(shù)據(jù)流
* 每次接收batch中一條數(shù)據(jù)
*/
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
try{
// 正常業(yè)務
}
catch (Exception e) {
// 記錄異常日志
throw new FailedException(e.getMessage(), e);
}
}
private Logger logger = LoggerFactory.getLogger(getClass());
}
只需要將捕獲的異常轉換為FailedException即可巧勤,F(xiàn)ailedException位于org.apache.storm.topology中。
為什么必須是FailedException呢弄匕?通過在throw new FailedException(e.getMessage(), e);設置斷點跟蹤可觀察到在Bolt的執(zhí)行器TridentBoltExecutor類public void execute(Tuple tuple)方法中有代碼塊
try {
_bolt.execute(tracked.info, tuple);
if(tracked.condition.expectedTaskReports==0) {
success = finishBatch(tracked, tuple);
}
}
catch(FailedException e) {
failBatch(tracked, e);
}
TridentBoltExecutor捕獲到FailedException后調(diào)用了failBatch方法颅悉,繼續(xù)跟蹤failBatch方法最終會在事務對象TransactionAttempt上將事務嘗試號+1并調(diào)用spout的emitBatch方法。
完整demo
public class WordSpout implements ITridentSpout<String> {
/**
*
*/
private static final long serialVersionUID = -954626449213280061L;
private String chars = "abcdefghijklmnopqrstuvwxyz";
/**
* 協(xié)調(diào)器
* 負責保存重放batch元數(shù)據(jù)迁匠,當重放一個batch時剩瓶,通過協(xié)調(diào)器中保存的元數(shù)據(jù)創(chuàng)建batch
*/
@Override
public BatchCoordinator<String> getCoordinator(String txStateId,Map conf, TopologyContext context) {
return new WordCoordinator();
}
@Override
public Emitter<String> getEmitter(String txStateId, Map conf, TopologyContext context) {
return new WordEmitter();
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
/**
* 定義發(fā)送的所有字段
*/
@Override
public Fields getOutputFields() {
return new Fields("field1","field2");
}
private class WordCoordinator implements BatchCoordinator<String> {
@Override
public String initializeTransaction(long txid, String prevMetadata, String currMetadata) {
return null;
}
@Override
public void success(long txid) {
logger.info("success: " + txid);
}
@Override
public boolean isReady(long txid) {
logger.info("begin {}", txid);
return Boolean.TRUE;
}
@Override
public void close() {
}
}
/**
* 發(fā)射器
* 發(fā)送數(shù)據(jù)流
*
*/
private class WordEmitter implements Emitter<String> {
@Override
public void success(TransactionAttempt tx) {
logger.info("emitter success " + tx.getId());
}
@Override
public void close() {
}
/**
* 每次調(diào)用本方法所發(fā)送的數(shù)據(jù)集合被稱為batch
* batch是Trident中發(fā)送數(shù)據(jù)流的最小單元
*/
@Override
public void emitBatch(TransactionAttempt tx, String coordinatorMeta, TridentCollector collector) {
logger.info("TransactionId : {},AttemptId : {},currMetadata : {}",tx.getTransactionId(),tx.getAttemptId(),coordinatorMeta);
for(int i=0;i<10;i++){
List list = Lists.newArrayList();
list.add("" + chars.charAt((int)(Math.random() * 26)));
list.add("event2");
collector.emit(list);
}
}
}
private Logger logger = LoggerFactory.getLogger("Trident Spout");
}
public class WordFunction extends BaseFunction {
/**
*
*/
private static final long serialVersionUID = 735468688795780833L;
/**
* 接收數(shù)據(jù)流 每次接收batch中一條數(shù)據(jù)
*/
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
String value = tuple.getValueByField("field1").toString();
logger.info("funtion value : " + value);
if (value.charAt(0) > 'h' && value.charAt(0) < 'n') {
throw new FailedException();
}
}
private Logger logger = LoggerFactory.getLogger(getClass());
}
public class Start {
public static StormTopology buildTopology() {
TridentTopology topology = new TridentTopology();
WordSpout spout = new WordSpout();
WordFunction function = new WordFunction();
topology.newStream("filter", spout)
/**
* 將spout發(fā)送的數(shù)據(jù)流中哪些字段傳入bolt中
*/
.each(new Fields("field1"), function, new Fields());
return topology.build();
}
public static void main(String[] args) throws Exception {
Config conf = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("MyStorm", conf, buildTopology());
Thread.sleep(1000 * 60);
cluster.shutdown();
}
}
spout每次發(fā)送10個字母,bolt對每次接收到的字母進行判斷城丧,如果該字母位于h--n之間則認為事務失敗延曙,拋出FailedException。通知spout重發(fā)batch
日志可見
16237 [Thread-18-$mastercoord-bg0-executor[1 1]] INFO Trident Spout - begin 1
16348 [Thread-22-spout-filter-executor[5 5]] INFO Trident Spout - TransactionId : 1,AttemptId : 0,currMetadata : null
16361 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : b
16361 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : v
16361 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : b
16361 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : v
16361 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : n
16361 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : l
16362 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : j
16362 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : e
16365 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : g
16366 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : y
16377 [Thread-18-$mastercoord-bg0-executor[1 1]] INFO Trident Spout - begin 1
16495 [Thread-22-spout-filter-executor[5 5]] INFO Trident Spout - TransactionId : 1,AttemptId : 1,currMetadata : null
16497 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : y
16497 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : d
16497 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : z
16497 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : q
16498 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : g
16498 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : t
16498 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : p
16498 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : z
16498 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : q
16498 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : h
16506 [Thread-22-spout-filter-executor[5 5]] INFO Trident Spout - emitter success 1
16506 [Thread-14-$spoutcoord-spout-filter-executor[2 2]] INFO Trident Spout - success: 1
16567 [Thread-18-$mastercoord-bg0-executor[1 1]] INFO Trident Spout - begin 2
16684 [Thread-22-spout-filter-executor[5 5]] INFO Trident Spout - TransactionId : 2,AttemptId : 0,currMetadata : null
16686 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : n
16686 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : r
16686 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : q
16687 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : a
16687 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : k
16687 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : s
16687 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : k
16688 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : c
16688 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : l
16688 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : n
16693 [Thread-18-$mastercoord-bg0-executor[1 1]] INFO Trident Spout - begin 2
16791 [Thread-22-spout-filter-executor[5 5]] INFO Trident Spout - TransactionId : 2,AttemptId : 1,currMetadata : null
16793 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : i
16793 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : y
16793 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : y
16793 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : i
16793 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : w
16793 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : m
16793 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : j
16793 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : a
16793 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : w
16793 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : w
第一次發(fā)送batch,AttemptId為0亡哄,每次重發(fā)則+1枝缔。