19. Storm Topology開發(fā)

一. wordCount Topology開發(fā):

1.spout數(shù)據(jù)收集器(SentenceSpout類):

有兩種方法來開發(fā)spout類戴涝,第一種是實現(xiàn)backtype.storm.topology.IRichSpout接口,第二種是繼承backtype.storm.topology.base.BaseRichSpout類。
其中适刀,IRichSpout接口提供了更多的一些需要實現(xiàn)的方法捅伤,BaseRichSpout類只提供了3個需要實現(xiàn)的方法吏饿。

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        // TODO Auto-generated method stub
        this.collector = collector;
    }
    @Override
    public void close() {
        // TODO Auto-generated method stub
        
    }
    @Override
    public void activate() {
        // TODO Auto-generated method stub
        
    }
    @Override
    public void deactivate() {
        // TODO Auto-generated method stub
        
    }
    @Override
    public void nextTuple() {
        // TODO Auto-generated method stub
        
    }
    @Override
    public void ack(Object msgId) {
        // TODO Auto-generated method stub
        
    }
    @Override
    public void fail(Object msgId) {
        // TODO Auto-generated method stub
        
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub
        
    }
    @Override
    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }

上邊的這些方法中愈腾,有幾個比較重要炉爆,需要實現(xiàn)堕虹。
1.nextTuple():
在該方法中卧晓,編寫從數(shù)據(jù)源獲取數(shù)據(jù)的邏輯。該方法程序循環(huán)調(diào)用赴捞,collector.emit()向后邊的bolt發(fā)射數(shù)據(jù)逼裆。
2.declareOutputFields():
該方法聲明向后邊發(fā)射的記錄的字段名稱。
3.tuple
collector.emit()方法發(fā)射的內(nèi)容是Tuple螟炫,類型為List<Object> tuple波附。 tuple元組是一系列key艺晴,value對的集合昼钻。例如:(a:a_value,b:b_value,c:c_value,...,n:n_value)。其中封寞,collector.emit(new Values())聲明的是tuple的value值然评,而declarer.declare(new Fields())聲明的是tuple的key值,兩者是一一對應(yīng)的(假如new Values(val1,val2)狈究,那么碗淌,declarer.declare(new Fields(key1,key2))也需要聲明2個值,并且key1對一個val1抖锥,key2對應(yīng)val2)亿眠。
4.open():
該方法是初始化方法,將會第一個被調(diào)用磅废,一般纳像,我們可以在該方法內(nèi)實例化定義的類。

2.bolt組件(SplitBolt拯勉、CountBolt類):

開發(fā)bolt組件竟趾,需要實現(xiàn)backtype.storm.topology.IRichBolt接口,或者繼承類backtype.storm.topology.base.BaseRichBolt宫峦。
下面幾個方法比較重要:
1.prepare()
初始化方法岔帽,將會第一個被調(diào)用,一般导绷,我們可以在該方法內(nèi)實例化定義的類犀勒。
2.execute()
循環(huán)調(diào)用,被動執(zhí)行,前面數(shù)據(jù)來源向該bolt發(fā)射tuple的時候妥曲,就會調(diào)用execute方法账蓉。
3.declareOutputFields
與spout相同。

3.Topology驅(qū)動類(WordsToplogy類):

向集群提交Topology逾一,需要使用類backtype.storm.topology.TopologyBuilder铸本。TopologyBuilder類可以配置spout、bolt組件的記錄發(fā)射關(guān)系(前后依賴關(guān)系遵堵,例如:spout --> bolt1 -->bolt2等)箱玷。

TopologyBuilder builder = new TopologyBuilder();        
builder.setSpout("Spout", new SentenceSpout());
builder.setBolt("SplitBolt", new SplitBolt()).shuffleGrouping("Spout");
builder.setBolt("CountBolt", new CountBolt())
 .fieldsGrouping("SplitBolt", new Fields("Word"));

所謂的grouping策略就是在Spout與Bolt怨规、Bolt與Bolt之間傳遞Tuple的方式。Grouping分組策略主要有以下幾種:
1.shuffleGrouping:隨機分組锡足。將流分組定義為混排波丰。這種混排分組意味著來自Spout的輸入將混排,或隨機分發(fā)給此Bolt中的任務(wù)舶得。shuffle grouping對各個task的tuple分配的比較均勻掰烟。
2.fieldsGrouping:按照字段分組。Storm能保證所有相同F(xiàn)ield值的數(shù)據(jù)到達的是相同的Blot沐批,但是不保證一個Blot只處理一個值域纫骑。這對于分組統(tǒng)計的應(yīng)用來說是比較重要的,如果分組不正確的話會造成統(tǒng)計出錯九孩。
3.globalgrouping:全局分組先馆,前面組件的數(shù)據(jù),全部只會往該組件的其中一個上傳送。
4.allGrouping:廣播發(fā)送躺彬,即每一個Tuple煤墙,每一個Bolt都會收到。

4.Storm集群參數(shù)配置:

對于storm集群的參數(shù)宪拥,可以通過Config對象來配置仿野。

Config conf = new Config();
conf.setMaxSpoutPending(10);

也可以通過conf.put(key, value)來配置xml文件中的參數(shù)。

5.單機運行或者提交storm集群:

單機提交topology(主要用于提交集群前的測試她君,非常重要)脚作。使用LocalCluster類來提交單機測試topology:

LocalCluster local = new LocalCluster();
local.submitTopology("LocalTest", conf, builder.createTopology());

集群提交topology:

StormSubmitter.submitTopology("WordCount", conf, builder.createTopology());

6.代碼示例:

下面的代碼實現(xiàn)了一個詞頻統(tǒng)計的storm實例,功能非常簡單犁河,隨機發(fā)送sentence并拆分統(tǒng)計單詞鳖枕。
https://github.com/neil-ma/storm-pmpa/tree/master/storm-pirate/src/main/java/com/pmpa/storm/words

二. Topology并發(fā)控制:

1.并發(fā)控制組件:

Storm的并發(fā)度最終表示的Task的并發(fā)度。Storm執(zhí)行架構(gòu)有三個層次 Worker -> Executor -> Task桨螺。配置以上3個組件的數(shù)量來控制并發(fā)度宾符。
Worker進程:針對具體的Topology,worker上只運行與之相關(guān)的Topology灭翔,一個worker進程上可以啟動多個executor線程魏烫。
Executor線程:針對具體的task(spout、bolt)肝箱,一個Executor線程上可以跑多個task哄褒,默認一個Executor運行一個task。
Task:指定多個task來運行spout或者bolt組件煌张。

2.參數(shù)配置:

1.Worker進程數(shù)量:
通過Config設(shè)置 : conf.setNumWorkers(4); // 設(shè)置worker個數(shù)為4
Supervisor進程負責啟動worker呐赡,假如有3個supervior,這3個supervisor會平均配置4個Worker骏融,例如: 2 1 1 链嘀。
2.Executor個數(shù):
在構(gòu)造Topology時萌狂,在setSpout()或者setBolt()方法中設(shè)定executor的數(shù)量。例如下例子:

builder.setBolt("SplitBolt", new SplitBolt(),3).shuffleGrouping("Spout")

代碼表示怀泊,需要啟動3個executor來運行SplitBolt茫藏。需要注意的是,這里表示一共有3個executor霹琼,而不是每個worker上運行3個executor务傲。假如說,config的配置一共有2個worker枣申,那么分配的結(jié)果就是一個worker上執(zhí)行2個executor售葡,另一個worker上執(zhí)行1個executor。后邊的task配置也是一樣的道理糯而。
3.task個數(shù)
task的數(shù)量由setNumTasks()方法確定天通,例如下邊的定義:

builder.setBolt("SplitBolt", new SplitBolt(),3).shuffleGrouping("Spout")
  .setNumTasks(6);

上邊代碼表示3個executor共執(zhí)行6個task泊窘,storm會平均分配一個executor執(zhí)行2個task(系統(tǒng)自動做到盡量均勻)熄驼。如果不指定setNumTasks()方法,默認1個Executor運行一個Task烘豹,上邊代碼如果不指定setNumTasks()方法會有3個Task執(zhí)行瓜贾。

三. Storm消息可靠性保障機制:

對于某些實時大數(shù)據(jù)應(yīng)用,例如銀行的實時數(shù)據(jù)携悯、交管部門的實時數(shù)據(jù)等祭芦,需要保證數(shù)據(jù)的可靠性,在實施這類應(yīng)用時憔鬼,就需要開啟storm的消息可靠性保障機制龟劲。消息可靠性保障機制實際上就是Storm需要對spout發(fā)送的每一條消息是否被后續(xù)的bolt成功處理完成有一條反饋。

1.原理和機制:

1.ack機制:
為了保證storm的每條記錄都能正確處理轴或,Storm會對Spout發(fā)送的每一個tuple進行跟蹤昌跌。這里面包括ack/fail的處理,一個tuple處理成功是指這個Tuple以及這個Tuple產(chǎn)生的所有Tuple都被成功處理, 會調(diào)用spout的ack方法照雁;失敗是指這個Tuple或這個Tuple產(chǎn)生的所有Tuple中的某一個tuple處理失敗, 則會調(diào)用spout的fail方法蚕愤;在處理tuple的每一個bolt都會通過OutputCollector來告知storm, 當前bolt處理是否成功。
2.ack原理:
Storm中有個特殊的task名叫acker饺蚊,他們負責跟蹤spout發(fā)出的每一個Tuple的Tuple樹萍诱。當acker(框架自啟動的task)發(fā)現(xiàn)一個Tuple樹已經(jīng)處理完成了,它會發(fā)送一個消息給產(chǎn)生這個Tuple的那個task污呼。

2.實現(xiàn):

1.spout處理:
(1)spout往后發(fā)射tuple時裕坊,需要指定一個msgId。

2.bolt處理:
(1)bolt處理接收到tuple燕酷,如果還需要繼續(xù)往后邊的bolt發(fā)射籍凝,需要追溯前邊的tuple(這么做的目的是構(gòu)建Tuple樹)

collector.emit(input,new Values(word));

(2)處理完bolt映企,一定要調(diào)用collector的ack方法,

四. Trident介紹和實現(xiàn):

1. 問題:

前邊介紹的基礎(chǔ)的storm都是逐條處理數(shù)據(jù)的(一個tuple静浴、一個tuple處理)堰氓。在生產(chǎn)環(huán)境中,一般都是Kafka + Storm + HBase/Redis 架構(gòu)處理實時數(shù)據(jù)苹享。如果只是逐條處理的話双絮,對下游數(shù)據(jù)庫(HBase、Redis)的壓力就會非常大得问。
Trident是Storm提供的解決方案囤攀,一個批次一個批次處理實時數(shù)據(jù),其中一個批次封裝了多條tuple宫纬。Trident能夠提高數(shù)據(jù)處理效率和性能焚挠,同時也減小了對后端數(shù)據(jù)庫的壓力。因為Trident是以批次為單位來處理數(shù)據(jù)的漓骚,所以這里就涉及到事務(wù)的問題蝌衔。Trident中已經(jīng)封裝了事務(wù)管理、狀態(tài)管理的功能(框架幫我們自動實現(xiàn))蝌蹂,而且還封裝了一系列的常用操作噩斟,鏈式調(diào)用。真正實現(xiàn)流式處理數(shù)據(jù)孤个。
Storm從0.7版本開始引入事務(wù)管理剃允,之前版本中提供的Transactional Topology API已經(jīng)廢棄不用了。

2. Storm事務(wù)管理:

Storm事務(wù)管理分為3個層次:
(1)No Transactional:
不進行事務(wù)管理齐鲤。一個批次中的tuple可能有的成功斥废,有的失敗,不限制一致性给郊。tuple處理成功次數(shù)可能不止一次牡肉,同一個tuple可能在多個批次中處理,并且都成功丑罪,也可能一次都不成功荚板。
(2)Transactional :
保證tuple只會在一個批次中出現(xiàn),即使失敗重試吩屹,tuple的批次號還是不變的跪另,同一個tuple保證最多成功一次。
(3)Opaque Transactional:
不透明事務(wù)煤搜,和第2種類似免绿。相比于第2種,提供了容錯的機制擦盾。某些tuple在某個批次中處理失敗后嘲驾,可以在另外一個批次里處理成功(失敗后淌哟,將該tuple轉(zhuǎn)到另外一個批次中處理),但不會成功多次辽故。

3. Storm事務(wù)原理:

(1)將多條tuple封裝成一個批次徒仓,并且給該批次指定一個唯一的批次號(batchId)。
(2)后邊組件處理數(shù)據(jù)按照批次先后順序處理(前邊的批次更新后誊垢,才能處理后邊的批次)掉弛,結(jié)果的更新,一定是前面的批次更新成功后才能進行后面的批次結(jié)果更新喂走。

4. WordCountTridentTopology實現(xiàn):

TridentTopology需要開發(fā)自己的spout(以前是逐條發(fā)送tuple殃饿,現(xiàn)在的需要將多條tuple封裝成一個batch發(fā)送),自己的function(在trident中不叫bolt芋肠,而是function乎芳,實現(xiàn)的功能與bolt一樣),下面實現(xiàn)了一個最簡單的實例:
https://github.com/neil-ma/storm-pmpa/tree/master/storm-pirate/src/main/java/com/pmpa/storm/wordstrident

五. Trident編程:

1. 編寫Trident Spout:

編寫Trident Spout需要自行實現(xiàn)將tuple打包成batch的邏輯帖池。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末奈惑,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子碘裕,更是在濱河造成了極大的恐慌携取,老刑警劉巖攒钳,帶你破解...
    沈念sama閱讀 218,858評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件帮孔,死亡現(xiàn)場離奇詭異,居然都是意外死亡不撑,警方通過查閱死者的電腦和手機文兢,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,372評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來焕檬,“玉大人姆坚,你說我怎么就攤上這事∈涤蓿” “怎么了兼呵?”我有些...
    開封第一講書人閱讀 165,282評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長腊敲。 經(jīng)常有香客問我击喂,道長,這世上最難降的妖魔是什么碰辅? 我笑而不...
    開封第一講書人閱讀 58,842評論 1 295
  • 正文 為了忘掉前任懂昂,我火速辦了婚禮,結(jié)果婚禮上没宾,老公的妹妹穿的比我還像新娘凌彬。我一直安慰自己沸柔,他們只是感情好,可當我...
    茶點故事閱讀 67,857評論 6 392
  • 文/花漫 我一把揭開白布铲敛。 她就那樣靜靜地躺著褐澎,像睡著了一般。 火紅的嫁衣襯著肌膚如雪伐蒋。 梳的紋絲不亂的頭發(fā)上乱凿,一...
    開封第一講書人閱讀 51,679評論 1 305
  • 那天,我揣著相機與錄音咽弦,去河邊找鬼徒蟆。 笑死,一個胖子當著我的面吹牛型型,可吹牛的內(nèi)容都是我干的段审。 我是一名探鬼主播,決...
    沈念sama閱讀 40,406評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼闹蒜,長吁一口氣:“原來是場噩夢啊……” “哼寺枉!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起绷落,我...
    開封第一講書人閱讀 39,311評論 0 276
  • 序言:老撾萬榮一對情侶失蹤姥闪,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后砌烁,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體筐喳,經(jīng)...
    沈念sama閱讀 45,767評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年函喉,在試婚紗的時候發(fā)現(xiàn)自己被綠了避归。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,090評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡管呵,死狀恐怖梳毙,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情捐下,我是刑警寧澤账锹,帶...
    沈念sama閱讀 35,785評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站坷襟,受9級特大地震影響奸柬,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜啤握,卻給世界環(huán)境...
    茶點故事閱讀 41,420評論 3 331
  • 文/蒙蒙 一鸟缕、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧,春花似錦懂从、人聲如沸授段。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,988評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽侵贵。三九已至,卻和暖如春缘薛,著一層夾襖步出監(jiān)牢的瞬間窍育,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,101評論 1 271
  • 我被黑心中介騙來泰國打工宴胧, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留漱抓,地道東北人。 一個月前我還...
    沈念sama閱讀 48,298評論 3 372
  • 正文 我出身青樓恕齐,卻偏偏與公主長得像乞娄,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子显歧,可洞房花燭夜當晚...
    茶點故事閱讀 45,033評論 2 355

推薦閱讀更多精彩內(nèi)容