Storm Trident之二事務控制

Storm中的事務控制是門藝術举农,其中ack機制是精髓虑凛,可以參考Storm源碼分析 一書蔫浆,其中有精彩的分析殖属。

在storm開發(fā)過程中,相信一直有一個困擾很久的問題:function執(zhí)行失敗克懊,拋出了某個異常對象而導致topology終止運行忱辅。這時我們需要根據(jù)業(yè)務情況從下面2種選擇中作出判斷七蜘。

1.記錄本次異常谭溉,整個topology繼續(xù)運行墙懂。

2.通知spout重發(fā)數(shù)據(jù)。由于Trident中數(shù)據(jù)流的最小單位為batch,所以重發(fā)數(shù)據(jù)意味重發(fā)失敗的整個batch扮念。

第一種情況很好處理
public class WordFunction extends BaseFunction {

    /**
     * 
     */
    private static final long serialVersionUID = 735468688795780833L;

    /**
     * 接收數(shù)據(jù)流
     * 每次接收batch中一條數(shù)據(jù)
     */
    @Override
    public void execute(TridentTuple tuple, TridentCollector collector) {
        try{
            //    正常業(yè)務
        }
        catch (Exception e) {
            //    記錄異常日志
        }
        
    }

    private Logger logger = LoggerFactory.getLogger(getClass());
}

通過try...catch捕獲異常损搬,并進行相應處理。保證topology繼續(xù)運行柜与。

第二種情況處理也很簡單
public class WordFunction extends BaseFunction {

    /**
     * 
     */
    private static final long serialVersionUID = 735468688795780833L;

    /**
     * 接收數(shù)據(jù)流
     * 每次接收batch中一條數(shù)據(jù)
     */
    @Override
    public void execute(TridentTuple tuple, TridentCollector collector) {
        try{
            //    正常業(yè)務
        }
        catch (Exception e) {
            //    記錄異常日志
            throw new FailedException(e.getMessage(), e);
        }
        
    }

    private Logger logger = LoggerFactory.getLogger(getClass());
}

只需要將捕獲的異常轉換為FailedException即可巧勤,F(xiàn)ailedException位于org.apache.storm.topology中。

為什么必須是FailedException呢弄匕?通過在throw new FailedException(e.getMessage(), e);設置斷點跟蹤可觀察到在Bolt的執(zhí)行器TridentBoltExecutor類public void execute(Tuple tuple)方法中有代碼塊

try {
    _bolt.execute(tracked.info, tuple);
    if(tracked.condition.expectedTaskReports==0) {
        success = finishBatch(tracked, tuple);
    }
}
catch(FailedException e) {
    failBatch(tracked, e);
}

TridentBoltExecutor捕獲到FailedException后調(diào)用了failBatch方法颅悉,繼續(xù)跟蹤failBatch方法最終會在事務對象TransactionAttempt上將事務嘗試號+1并調(diào)用spout的emitBatch方法。

完整demo
public class WordSpout implements ITridentSpout<String> {
    
    /**
     * 
     */
    private static final long serialVersionUID = -954626449213280061L;
    
    private String chars = "abcdefghijklmnopqrstuvwxyz";
    

    /**
     * 協(xié)調(diào)器
     * 負責保存重放batch元數(shù)據(jù)迁匠,當重放一個batch時剩瓶,通過協(xié)調(diào)器中保存的元數(shù)據(jù)創(chuàng)建batch
     */
    @Override
    public BatchCoordinator<String> getCoordinator(String txStateId,Map conf, TopologyContext context) {
        return new WordCoordinator();
    }

    @Override
    public Emitter<String> getEmitter(String txStateId, Map conf, TopologyContext context) {
        return new WordEmitter();
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

    /**
     * 定義發(fā)送的所有字段
     */
    @Override
    public Fields getOutputFields() {
        return new Fields("field1","field2");
    }
    
    private class WordCoordinator implements BatchCoordinator<String> {

        @Override
        public String initializeTransaction(long txid, String prevMetadata, String currMetadata) {
            return null;
        }

        @Override
        public void success(long txid) {
            logger.info("success: " + txid);
        }

        @Override
        public boolean isReady(long txid) {
            logger.info("begin {}", txid);
            return Boolean.TRUE;
        }

        @Override
        public void close() {
            
        }
        
    }
    
    /**
     * 發(fā)射器
     * 發(fā)送數(shù)據(jù)流
     *
     */
    private class WordEmitter implements Emitter<String> {

        @Override
        public void success(TransactionAttempt tx) {
            logger.info("emitter success " + tx.getId());
        }

        @Override
        public void close() {
        }
        
        /**
         * 每次調(diào)用本方法所發(fā)送的數(shù)據(jù)集合被稱為batch
         * batch是Trident中發(fā)送數(shù)據(jù)流的最小單元
         */
        @Override
        public void emitBatch(TransactionAttempt tx, String coordinatorMeta, TridentCollector collector) {
            
            logger.info("TransactionId : {},AttemptId : {},currMetadata : {}",tx.getTransactionId(),tx.getAttemptId(),coordinatorMeta);
            
            for(int i=0;i<10;i++){
                List list = Lists.newArrayList();
                list.add("" + chars.charAt((int)(Math.random() * 26)));
                list.add("event2");
                collector.emit(list);
            }
        }
    }

    private Logger logger = LoggerFactory.getLogger("Trident Spout");
}

public class WordFunction extends BaseFunction {

    /**
     * 
     */
    private static final long serialVersionUID = 735468688795780833L;

    /**
     * 接收數(shù)據(jù)流 每次接收batch中一條數(shù)據(jù)
     */
    @Override
    public void execute(TridentTuple tuple, TridentCollector collector) {
        String value = tuple.getValueByField("field1").toString();
        logger.info("funtion value : " + value);
        if (value.charAt(0) > 'h' && value.charAt(0) < 'n') {
            throw new FailedException();
        }
    }

    private Logger logger = LoggerFactory.getLogger(getClass());
}

public class Start {

    public static StormTopology buildTopology() {
        TridentTopology topology = new TridentTopology();
        WordSpout spout = new WordSpout();
        WordFunction function = new WordFunction();

        topology.newStream("filter", spout)
                /**
                 * 將spout發(fā)送的數(shù)據(jù)流中哪些字段傳入bolt中
                 */
                .each(new Fields("field1"), function, new Fields());

        return topology.build();
    }

    public static void main(String[] args) throws Exception {
        Config conf = new Config();
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("MyStorm", conf, buildTopology());

        Thread.sleep(1000 * 60);
        cluster.shutdown();
    }
}

spout每次發(fā)送10個字母,bolt對每次接收到的字母進行判斷城丧,如果該字母位于h--n之間則認為事務失敗延曙,拋出FailedException。通知spout重發(fā)batch

日志可見

16237 [Thread-18-$mastercoord-bg0-executor[1 1]] INFO  Trident Spout - begin 1
16348 [Thread-22-spout-filter-executor[5 5]] INFO  Trident Spout - TransactionId : 1,AttemptId : 0,currMetadata : null
16361 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : b
16361 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : v
16361 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : b
16361 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : v
16361 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : n
16361 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : l
16362 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : j
16362 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : e
16365 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : g
16366 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : y
16377 [Thread-18-$mastercoord-bg0-executor[1 1]] INFO  Trident Spout - begin 1
16495 [Thread-22-spout-filter-executor[5 5]] INFO  Trident Spout - TransactionId : 1,AttemptId : 1,currMetadata : null
16497 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : y
16497 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : d
16497 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : z
16497 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : q
16498 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : g
16498 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : t
16498 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : p
16498 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : z
16498 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : q
16498 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : h
16506 [Thread-22-spout-filter-executor[5 5]] INFO  Trident Spout - emitter success 1
16506 [Thread-14-$spoutcoord-spout-filter-executor[2 2]] INFO  Trident Spout - success: 1
16567 [Thread-18-$mastercoord-bg0-executor[1 1]] INFO  Trident Spout - begin 2
16684 [Thread-22-spout-filter-executor[5 5]] INFO  Trident Spout - TransactionId : 2,AttemptId : 0,currMetadata : null
16686 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : n
16686 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : r
16686 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : q
16687 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : a
16687 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : k
16687 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : s
16687 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : k
16688 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : c
16688 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : l
16688 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : n
16693 [Thread-18-$mastercoord-bg0-executor[1 1]] INFO  Trident Spout - begin 2
16791 [Thread-22-spout-filter-executor[5 5]] INFO  Trident Spout - TransactionId : 2,AttemptId : 1,currMetadata : null
16793 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : i
16793 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : y
16793 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : y
16793 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : i
16793 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : w
16793 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : m
16793 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : j
16793 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : a
16793 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : w
16793 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : w

第一次發(fā)送batch,AttemptId為0亡哄,每次重發(fā)則+1枝缔。

我們注意到重發(fā)的batch與該batch第一次發(fā)送時的數(shù)據(jù)內(nèi)容不一致。這在實際項目肯定是不允許的蚊惯。如何保證一個batch是否經(jīng)歷重發(fā)數(shù)據(jù)內(nèi)容一致愿卸?需要使用到BatchCoordinator.initializeTransaction方法所提供的元數(shù)據(jù)。

batch發(fā)送成功后截型,Emitter.success和BatchCoordinator.success均會被調(diào)用擦酌。但所處線程不同,Emitter.success與Emitter.emitBatch處于同一線程菠劝,而BatchCoordinator.success則處于協(xié)調(diào)器線程赊舶。個人習慣于在Emitter.success中處理發(fā)送成功后的邏輯處理。

由于分區(qū)事務IPartitionedTridentSpout通常與kafka結合使用且處理方式大同小異赶诊,so不再展開笼平。

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市舔痪,隨后出現(xiàn)的幾起案子寓调,更是在濱河造成了極大的恐慌,老刑警劉巖锄码,帶你破解...
    沈念sama閱讀 219,589評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件夺英,死亡現(xiàn)場離奇詭異晌涕,居然都是意外死亡,警方通過查閱死者的電腦和手機痛悯,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,615評論 3 396
  • 文/潘曉璐 我一進店門余黎,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人载萌,你說我怎么就攤上這事惧财。” “怎么了扭仁?”我有些...
    開封第一講書人閱讀 165,933評論 0 356
  • 文/不壞的土叔 我叫張陵垮衷,是天一觀的道長。 經(jīng)常有香客問我乖坠,道長搀突,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,976評論 1 295
  • 正文 為了忘掉前任熊泵,我火速辦了婚禮仰迁,結果婚禮上,老公的妹妹穿的比我還像新娘戈次。我一直安慰自己轩勘,他們只是感情好,可當我...
    茶點故事閱讀 67,999評論 6 393
  • 文/花漫 我一把揭開白布怯邪。 她就那樣靜靜地躺著绊寻,像睡著了一般。 火紅的嫁衣襯著肌膚如雪悬秉。 梳的紋絲不亂的頭發(fā)上澄步,一...
    開封第一講書人閱讀 51,775評論 1 307
  • 那天署鸡,我揣著相機與錄音湃鹊,去河邊找鬼。 笑死痢虹,一個胖子當著我的面吹牛武氓,可吹牛的內(nèi)容都是我干的梯皿。 我是一名探鬼主播,決...
    沈念sama閱讀 40,474評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼县恕,長吁一口氣:“原來是場噩夢啊……” “哼东羹!你這毒婦竟也來了?” 一聲冷哼從身側響起忠烛,我...
    開封第一講書人閱讀 39,359評論 0 276
  • 序言:老撾萬榮一對情侶失蹤属提,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體冤议,經(jīng)...
    沈念sama閱讀 45,854評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡斟薇,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,007評論 3 338
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了恕酸。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片堪滨。...
    茶點故事閱讀 40,146評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖尸疆,靈堂內(nèi)的尸體忽然破棺而出椿猎,到底是詐尸還是另有隱情惶岭,我是刑警寧澤寿弱,帶...
    沈念sama閱讀 35,826評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站按灶,受9級特大地震影響症革,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜鸯旁,卻給世界環(huán)境...
    茶點故事閱讀 41,484評論 3 331
  • 文/蒙蒙 一噪矛、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧铺罢,春花似錦艇挨、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,029評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至泉瞻,卻和暖如春脉漏,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背袖牙。 一陣腳步聲響...
    開封第一講書人閱讀 33,153評論 1 272
  • 我被黑心中介騙來泰國打工侧巨, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人鞭达。 一個月前我還...
    沈念sama閱讀 48,420評論 3 373
  • 正文 我出身青樓司忱,卻偏偏與公主長得像,于是被迫代替她去往敵國和親畴蹭。 傳聞我的和親對象是個殘疾皇子坦仍,可洞房花燭夜當晚...
    茶點故事閱讀 45,107評論 2 356

推薦閱讀更多精彩內(nèi)容