Storm Trident之一spout和bolt

Trident Spout


Trident Spout特點

  • Trident中,定義Spout的接口為ITridentSpout围来。
  • Trident Spout必須以批量形式發(fā)送tuple。
  • Trident Spout不真正執(zhí)行數(shù)據(jù)的發(fā)送茶袒,而是由ITridentSpout.Emitter負責發(fā)送數(shù)據(jù)褪迟。同時引入了協(xié)調器的概念,協(xié)調器負責管理數(shù)據(jù)發(fā)送的批次和元數(shù)據(jù)弦赖,當事務失敗時,調度Emitter根據(jù)元數(shù)據(jù)重新發(fā)送數(shù)據(jù)浦辨。協(xié)調器接口為ITridentSpout.BatchCoordinator蹬竖。

maven依賴

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>1.0.2</version>
</dependency>

Trident Spout簡單實現(xiàn)

    public class WordSpout implements ITridentSpout<String> {
        
        /**
         * 
         */
        private static final long serialVersionUID = -954626449213280061L;
    
        /**
         * 協(xié)調器
         * 負責保存重放batch元數(shù)據(jù),當重放一個batch時荤牍,通過協(xié)調器中保存的元數(shù)據(jù)創(chuàng)建batch
         */
        @Override
        public BatchCoordinator<String> getCoordinator(String txStateId,Map conf, TopologyContext context) {
            return new WordCoordinator();
        }
    
        @Override
        public Emitter<String> getEmitter(String txStateId, Map conf, TopologyContext context) {
            return new WordEmitter();
        }
    
        @Override
        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    
        /**
         * 定義發(fā)送的所有字段
         */
        @Override
        public Fields getOutputFields() {
            return new Fields("field1","field2");
        }
        
        private class WordCoordinator implements BatchCoordinator<String> {
    
            @Override
            public String initializeTransaction(long txid, String prevMetadata, String currMetadata) {
                return null;
            }
    
            @Override
            public void success(long txid) {
                logger.info("success: " + txid);
            }
    
            @Override
            public boolean isReady(long txid) {
                return Boolean.TRUE;
            }
    
            @Override
            public void close() {
                
            }
            
        }
        
        /**
         * 發(fā)射器
         * 發(fā)送數(shù)據(jù)流
         *
         */
        private class WordEmitter implements Emitter<String> {
    
            @Override
            public void success(TransactionAttempt tx) {
                logger.info("emitter success " + tx.getId());
            }
    
            @Override
            public void close() {
            }
            
            /**
             * 每次調用本方法所發(fā)送的數(shù)據(jù)集合被稱為batch
             * batch是Trident中發(fā)送數(shù)據(jù)流的最小單元
             */
            @Override
            public void emitBatch(TransactionAttempt tx, String coordinatorMeta, TridentCollector collector) {
                for(int i=0;i<10;i++){
                    List list = Lists.newArrayList();
                    list.add("event1");
                    list.add("event2");
                    collector.emit(list);
                }
            }
            
        }
    
        private Logger logger = LoggerFactory.getLogger("Trident Spout");
    }
重點說明

Emitter定義的emitBatch方法案腺。該方法實現(xiàn)了發(fā)送哪里數(shù)據(jù)。該方法每執(zhí)行一次康吵,發(fā)送的所有數(shù)據(jù)被稱為batch劈榨。batch中的每條數(shù)據(jù)被稱為tuple。

ITridentSpout.getOutputFields定義了每條tuple有哪些字段晦嵌,本例中定義了2個字段同辣,字段名為"field1"、"field2"惭载。在Emitter.emitBatch中每條tuple均符合該定義旱函。本例中每次調用emitBatch方法發(fā)送的數(shù)據(jù)內容及格式可以假象如下:

[event1,event2]
[event1,event2]
[event1,event2]
[event1,event2]
[event1,event2]
[event1,event2]
[event1,event2]
[event1,event2]
[event1,event2]
[event1,event2]

每次調用均發(fā)送了10條數(shù)據(jù)(10個tuple),這10個tuple構成1個batch,tuple均符合ITridentSpout.getOutputFields中定義的字段

Trident bolt


Trident bolt特點

  • Trident中沒有bolt接口,而是分為了Filter和Function兩類

Trident Function簡單實現(xiàn)

public class WordFunction extends BaseFunction {

    /**
     * 
     */
    private static final long serialVersionUID = 735468688795780833L;

    /**
     * 接收數(shù)據(jù)流
     * 每次接收batch中一條數(shù)據(jù)
     */
    @Override
    public void execute(TridentTuple tuple, TridentCollector collector) {
        logger.info(tuple.getValueByField("field1").toString());
    }

    private Logger logger = LoggerFactory.getLogger("Trident Function");
}
重點說明

BaseFunction已經(jīng)實現(xiàn)了Function接口描滔。

execute方法用于具體實現(xiàn)接收到tuple后如何處理棒妨。每次接收1個tuple。在本例中emitter每次發(fā)送1個batch,每個batch有10條數(shù)據(jù)含长,則每次發(fā)送數(shù)據(jù)券腔,execute方法均會被調用10次伏穆。

在tuple中可以獲取數(shù)據(jù)流中的數(shù)據(jù),能夠獲取的字段受TridentTopology對象的控制纷纫。

在execute方法中處理完成后可繼續(xù)使用TridentCollector對象繼續(xù)發(fā)送數(shù)據(jù)到下一節(jié)點枕扫,F(xiàn)unction發(fā)送數(shù)據(jù)時只能添加新的字段,不能修改或刪除已有的字段

啟動TridentTopology


public class Start {

    public static StormTopology buildTopology() {
        TridentTopology topology = new TridentTopology();
        WordSpout spout = new WordSpout();
        WordFunction function = new WordFunction();

        topology.newStream("filter", spout)
                /**
                 * 將spout發(fā)送的數(shù)據(jù)流中哪些字段傳入bolt中
                 */
                .each(new Fields("field1"), function, new Fields());

        return topology.build();
    }

    public static void main(String[] args) throws Exception {
        Config conf = new Config();
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("MyStorm", conf, buildTopology());

        Thread.sleep(1000 * 60);
        cluster.shutdown();
    }

}
重點說明

topology定義數(shù)據(jù)流時指定function可以讀取哪些字段。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市虹统,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌参滴,老刑警劉巖,帶你破解...
    沈念sama閱讀 210,835評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件剖笙,死亡現(xiàn)場離奇詭異卵洗,居然都是意外死亡请唱,警方通過查閱死者的電腦和手機弥咪,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,900評論 2 383
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來十绑,“玉大人聚至,你說我怎么就攤上這事”境龋” “怎么了扳躬?”我有些...
    開封第一講書人閱讀 156,481評論 0 345
  • 文/不壞的土叔 我叫張陵,是天一觀的道長甚亭。 經(jīng)常有香客問我贷币,道長,這世上最難降的妖魔是什么亏狰? 我笑而不...
    開封第一講書人閱讀 56,303評論 1 282
  • 正文 為了忘掉前任役纹,我火速辦了婚禮,結果婚禮上暇唾,老公的妹妹穿的比我還像新娘促脉。我一直安慰自己,他們只是感情好策州,可當我...
    茶點故事閱讀 65,375評論 5 384
  • 文/花漫 我一把揭開白布瘸味。 她就那樣靜靜地躺著,像睡著了一般够挂。 火紅的嫁衣襯著肌膚如雪旁仿。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,729評論 1 289
  • 那天孽糖,我揣著相機與錄音枯冈,去河邊找鬼汁胆。 笑死,一個胖子當著我的面吹牛霜幼,可吹牛的內容都是我干的嫩码。 我是一名探鬼主播,決...
    沈念sama閱讀 38,877評論 3 404
  • 文/蒼蘭香墨 我猛地睜開眼罪既,長吁一口氣:“原來是場噩夢啊……” “哼铸题!你這毒婦竟也來了?” 一聲冷哼從身側響起琢感,我...
    開封第一講書人閱讀 37,633評論 0 266
  • 序言:老撾萬榮一對情侶失蹤丢间,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后驹针,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體烘挫,經(jīng)...
    沈念sama閱讀 44,088評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,443評論 2 326
  • 正文 我和宋清朗相戀三年柬甥,在試婚紗的時候發(fā)現(xiàn)自己被綠了饮六。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,563評論 1 339
  • 序言:一個原本活蹦亂跳的男人離奇死亡苛蒲,死狀恐怖卤橄,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情臂外,我是刑警寧澤窟扑,帶...
    沈念sama閱讀 34,251評論 4 328
  • 正文 年R本政府宣布,位于F島的核電站漏健,受9級特大地震影響嚎货,放射性物質發(fā)生泄漏。R本人自食惡果不足惜蔫浆,卻給世界環(huán)境...
    茶點故事閱讀 39,827評論 3 312
  • 文/蒙蒙 一殖属、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧克懊,春花似錦忱辅、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,712評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至扮念,卻和暖如春损搬,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,943評論 1 264
  • 我被黑心中介騙來泰國打工巧勤, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留嵌灰,地道東北人。 一個月前我還...
    沈念sama閱讀 46,240評論 2 360
  • 正文 我出身青樓颅悉,卻偏偏與公主長得像沽瞭,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子剩瓶,可洞房花燭夜當晚...
    茶點故事閱讀 43,435評論 2 348

推薦閱讀更多精彩內容

  • 一. wordCount Topology開發(fā): 1.spout數(shù)據(jù)收集器(SentenceSpout類): 有...
    奉先閱讀 1,180評論 0 0
  • 這是一個JStorm使用教程驹溃,不包含環(huán)境搭建教程,直接在公司現(xiàn)有集群上跑任務延曙,關于JStorm集群環(huán)境搭建豌鹤,后續(xù)研...
    Coselding閱讀 6,306評論 1 9
  • 聲明 本文首發(fā)于個人技術博客,轉載請注明出處枝缔,本文鏈接:http://qifuguang.me/2015/11/2...
    winwill2012閱讀 2,187評論 1 15
  • 簡介: Trident 是 Storm 的一種高度抽象的實時計算模型布疙,它可以將高吞吐量(每秒百萬級)數(shù)據(jù)輸入、有狀...
    hello_coke閱讀 3,237評論 0 1
  • 現(xiàn)在的社會太匆忙愿卸,人情關系太復雜灵临,有些事情只能靠自己解決,無論是工作還是生活擦酌。有自己的想法俱诸,就應該想各種辦法菠劝,最終...
    平靜的海洋閱讀 1,546評論 0 0