Storm Trident(一)官方Tutorial

原文鏈接:http://storm.apache.org/releases/1.0.2/Trident-tutorial.html

本人原創(chuàng)翻譯摊崭,轉(zhuǎn)載請注明出處

Trident是基于Storm做實時計算的高等級的抽象讼油。它允許你無縫集成高吞吐量(每秒100萬級別的消息)、無狀態(tài)流處理呢簸、低延時的分布式查詢矮台。 如果你熟悉Pig或Cascading等高級別的批處理工具,就會很熟悉Trident的概念——Trident有joins, aggregations, grouping, functions, and filters阔墩。除此以外帽蝶,Trident原生支持任何數(shù)據(jù)庫或持久化存儲之上的有狀態(tài)的躺酒、增加的處理。Trident有一致的堂鲜、僅一次的語義伞芹,所以很容易推出Trident topologies忘苛。(最后這句不是很理解,請參考原文)唱较。

示例

這個例子會做兩件事:
1.從輸入句子流中計算單詞的數(shù)量
2.實現(xiàn)查詢:給出單詞列表扎唾,返回單詞數(shù)量之和

這個例子從下面的數(shù)據(jù)源中讀取無限的句子流:

FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
               new Values("the cow jumped over the moon"),
               new Values("the man went to the store and bought some candy"),
               new Values("four score and seven years ago"),
               new Values("how many apples can you eat"));
spout.setCycle(true);

下面是做單詞計數(shù)的代碼:

TridentTopology topology = new TridentTopology();        
TridentState wordCounts =
     topology.newStream("spout1", spout)
       .each(new Fields("sentence"), new Split(), new Fields("word"))
       .groupBy(new Fields("word"))
       .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))                
       .parallelismHint(6);

我們一行一行的看代碼。第一行南缓,TridentTopology對象被創(chuàng)建胸遇,暴露出構(gòu)成Trident計算的接口。TridentTopology 有一個方法newStream汉形,讀取輸入纸镊,創(chuàng)建一個新的流。在這里概疆,輸入源就是之前定義的FixedBatchSpout逗威。輸入源也可以是Kestrel或Kafka這樣的消息隊列。對每個輸入源岔冀,Trident跟蹤一個較小數(shù)量的狀態(tài)(關(guān)于已消費消息的元數(shù)據(jù))凯旭,狀態(tài)信息存放在zookeeper中。zookeeper的節(jié)點根據(jù)字符串"spout1"來確定狀態(tài)元數(shù)據(jù)的存放位置。

Trident以小批量tuples的形式處理流罐呼,例如鞠柄,輸入的句子流可能會被拆分成batchs,像這樣:

batched-stream.png

batchs的大小與輸入流的大小掛鉤弄贿,輸入流吞吐量大春锋,batchs就會大。

Trident提供了成熟的API來處理這些小型batchs差凹。API和Pig或Cascading的很像:可以做group by's, joins, aggregations, 運行functions, 運行filters等等期奔。當然,孤立的處理每個小型batch沒什么意義危尿,所以Trident提供了多個batchs聚合及持久化的函數(shù)——包括內(nèi)存, Memcached, Cassandra呐萌,等各種存儲。最后谊娇,Trident有一流的查詢實時狀態(tài)源的函數(shù)肺孤,狀態(tài)可以被Trident 更新(就像這個例子),或者也可能是獨立的狀態(tài)源济欢。

回到例子赠堵,spout發(fā)出"sentence"流,topology 定義的下一行啟用了Split函數(shù)法褥,把句子轉(zhuǎn)成單詞茫叭。 Split定義:

public class Split extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) {
       String sentence = tuple.getString(0);
       for(String word: sentence.split(" ")) {
           collector.emit(new Values(word));                
       }
   }
}

topology剩下的部分計算單詞數(shù)量并持久化存儲。首先以"word"字段分組半等,然后每個分組以Count聚合器持久化聚合揍愁。persistentAggregate 函數(shù)知道如何存儲聚合的結(jié)果。這個例子中杀饵,單詞計數(shù)保存在內(nèi)存中莽囤,但是也可以輕易的切換到Memcached, Cassandra等其他持久化。例如切換到Memcached:

.persistentAggregate(MemcachedState.transactional(serverLocations), new Count(), new Fields("count"))        
MemcachedState.transactional()

"serverLocations"是Memcached集群的host/port列表切距。

Trident 很酷的一點是它有完全的容錯和“僅一次”處理機制朽缎。這會使你的實時處理更易理解(reason about)。當失敗和重傳發(fā)生的時候谜悟,Trident 保存的狀態(tài)使它不會為同一個數(shù)據(jù)多次更新數(shù)據(jù)庫话肖。

persistentAggregate把流傳輸給TridentState 對象,這個例子中赌躺,TridentState 代表了所有的單詞計數(shù)狼牺,我們將使用它來進行分布式查詢。

topology 的下一個部分實現(xiàn)了單詞計數(shù)的低延時分布式查詢礼患。輸入是以空格分隔的單詞組是钥,輸出這些單詞的數(shù)量總和掠归。這些查詢像普通RPC調(diào)用的一樣執(zhí)行,不過是以分布式形式悄泥。例如:

DRPCClient client = new DRPCClient("drpc.server.location", 3772);
System.out.println(client.execute("words", "cat dog the man");
// prints the JSON-encoded result, e.g.: "[[5078]]"

像這種簡單查詢的延時大概10ms虏冻。

topology 的分布式查詢實現(xiàn)如下:

topology.newDRPCStream("words")
       .each(new Fields("args"), new Split(), new Fields("word"))
       .groupBy(new Fields("word"))
       .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
       .each(new Fields("count"), new FilterNull())
       .aggregate(new Fields("count"), new Sum(), new Fields("sum"));
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市弹囚,隨后出現(xiàn)的幾起案子厨相,更是在濱河造成了極大的恐慌,老刑警劉巖鸥鹉,帶你破解...
    沈念sama閱讀 216,496評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件蛮穿,死亡現(xiàn)場離奇詭異,居然都是意外死亡毁渗,警方通過查閱死者的電腦和手機践磅,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,407評論 3 392
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來灸异,“玉大人府适,你說我怎么就攤上這事》握粒” “怎么了檐春?”我有些...
    開封第一講書人閱讀 162,632評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長么伯。 經(jīng)常有香客問我疟暖,道長,這世上最難降的妖魔是什么蹦狂? 我笑而不...
    開封第一講書人閱讀 58,180評論 1 292
  • 正文 為了忘掉前任誓篱,我火速辦了婚禮朋贬,結(jié)果婚禮上凯楔,老公的妹妹穿的比我還像新娘。我一直安慰自己锦募,他們只是感情好摆屯,可當我...
    茶點故事閱讀 67,198評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著糠亩,像睡著了一般虐骑。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上赎线,一...
    開封第一講書人閱讀 51,165評論 1 299
  • 那天廷没,我揣著相機與錄音,去河邊找鬼垂寥。 笑死颠黎,一個胖子當著我的面吹牛另锋,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播狭归,決...
    沈念sama閱讀 40,052評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼夭坪,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了过椎?” 一聲冷哼從身側(cè)響起室梅,我...
    開封第一講書人閱讀 38,910評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎疚宇,沒想到半個月后亡鼠,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,324評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡敷待,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,542評論 2 332
  • 正文 我和宋清朗相戀三年拆宛,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片讼撒。...
    茶點故事閱讀 39,711評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡浑厚,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出根盒,到底是詐尸還是另有隱情钳幅,我是刑警寧澤,帶...
    沈念sama閱讀 35,424評論 5 343
  • 正文 年R本政府宣布炎滞,位于F島的核電站敢艰,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏册赛。R本人自食惡果不足惜钠导,卻給世界環(huán)境...
    茶點故事閱讀 41,017評論 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望森瘪。 院中可真熱鬧牡属,春花似錦、人聲如沸扼睬。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,668評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽窗宇。三九已至措伐,卻和暖如春军俊,著一層夾襖步出監(jiān)牢的瞬間侥加,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,823評論 1 269
  • 我被黑心中介騙來泰國打工粪躬, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留担败,地道東北人矗蕊。 一個月前我還...
    沈念sama閱讀 47,722評論 2 368
  • 正文 我出身青樓,卻偏偏與公主長得像氢架,于是被迫代替她去往敵國和親傻咖。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,611評論 2 353

推薦閱讀更多精彩內(nèi)容