原文鏈接:http://storm.apache.org/releases/1.0.2/Trident-tutorial.html
本人原創(chuàng)翻譯摊崭,轉(zhuǎn)載請注明出處
Trident是基于Storm做實時計算的高等級的抽象讼油。它允許你無縫集成高吞吐量(每秒100萬級別的消息)、無狀態(tài)流處理呢簸、低延時的分布式查詢矮台。 如果你熟悉Pig或Cascading等高級別的批處理工具,就會很熟悉Trident的概念——Trident有joins, aggregations, grouping, functions, and filters阔墩。除此以外帽蝶,Trident原生支持任何數(shù)據(jù)庫或持久化存儲之上的有狀態(tài)的躺酒、增加的處理。Trident有一致的堂鲜、僅一次的語義伞芹,所以很容易推出Trident topologies忘苛。(最后這句不是很理解,請參考原文)唱较。
示例
這個例子會做兩件事:
1.從輸入句子流中計算單詞的數(shù)量
2.實現(xiàn)查詢:給出單詞列表扎唾,返回單詞數(shù)量之和
這個例子從下面的數(shù)據(jù)源中讀取無限的句子流:
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
new Values("the cow jumped over the moon"),
new Values("the man went to the store and bought some candy"),
new Values("four score and seven years ago"),
new Values("how many apples can you eat"));
spout.setCycle(true);
下面是做單詞計數(shù)的代碼:
TridentTopology topology = new TridentTopology();
TridentState wordCounts =
topology.newStream("spout1", spout)
.each(new Fields("sentence"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
.parallelismHint(6);
我們一行一行的看代碼。第一行南缓,TridentTopology對象被創(chuàng)建胸遇,暴露出構(gòu)成Trident計算的接口。TridentTopology 有一個方法newStream汉形,讀取輸入纸镊,創(chuàng)建一個新的流。在這里概疆,輸入源就是之前定義的FixedBatchSpout逗威。輸入源也可以是Kestrel或Kafka這樣的消息隊列。對每個輸入源岔冀,Trident跟蹤一個較小數(shù)量的狀態(tài)(關(guān)于已消費消息的元數(shù)據(jù))凯旭,狀態(tài)信息存放在zookeeper中。zookeeper的節(jié)點根據(jù)字符串"spout1"來確定狀態(tài)元數(shù)據(jù)的存放位置。
Trident以小批量tuples的形式處理流罐呼,例如鞠柄,輸入的句子流可能會被拆分成batchs,像這樣:
batchs的大小與輸入流的大小掛鉤弄贿,輸入流吞吐量大春锋,batchs就會大。
Trident提供了成熟的API來處理這些小型batchs差凹。API和Pig或Cascading的很像:可以做group by's, joins, aggregations, 運行functions, 運行filters等等期奔。當然,孤立的處理每個小型batch沒什么意義危尿,所以Trident提供了多個batchs聚合及持久化的函數(shù)——包括內(nèi)存, Memcached, Cassandra呐萌,等各種存儲。最后谊娇,Trident有一流的查詢實時狀態(tài)源的函數(shù)肺孤,狀態(tài)可以被Trident 更新(就像這個例子),或者也可能是獨立的狀態(tài)源济欢。
回到例子赠堵,spout發(fā)出"sentence"流,topology 定義的下一行啟用了Split函數(shù)法褥,把句子轉(zhuǎn)成單詞茫叭。 Split定義:
public class Split extends BaseFunction {
public void execute(TridentTuple tuple, TridentCollector collector) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
collector.emit(new Values(word));
}
}
}
topology剩下的部分計算單詞數(shù)量并持久化存儲。首先以"word"字段分組半等,然后每個分組以Count聚合器持久化聚合揍愁。persistentAggregate 函數(shù)知道如何存儲聚合的結(jié)果。這個例子中杀饵,單詞計數(shù)保存在內(nèi)存中莽囤,但是也可以輕易的切換到Memcached, Cassandra等其他持久化。例如切換到Memcached:
.persistentAggregate(MemcachedState.transactional(serverLocations), new Count(), new Fields("count"))
MemcachedState.transactional()
"serverLocations"是Memcached集群的host/port列表切距。
Trident 很酷的一點是它有完全的容錯和“僅一次”處理機制朽缎。這會使你的實時處理更易理解(reason about)。當失敗和重傳發(fā)生的時候谜悟,Trident 保存的狀態(tài)使它不會為同一個數(shù)據(jù)多次更新數(shù)據(jù)庫话肖。
persistentAggregate把流傳輸給TridentState 對象,這個例子中赌躺,TridentState 代表了所有的單詞計數(shù)狼牺,我們將使用它來進行分布式查詢。
topology 的下一個部分實現(xiàn)了單詞計數(shù)的低延時分布式查詢礼患。輸入是以空格分隔的單詞組是钥,輸出這些單詞的數(shù)量總和掠归。這些查詢像普通RPC調(diào)用的一樣執(zhí)行,不過是以分布式形式悄泥。例如:
DRPCClient client = new DRPCClient("drpc.server.location", 3772);
System.out.println(client.execute("words", "cat dog the man");
// prints the JSON-encoded result, e.g.: "[[5078]]"
像這種簡單查詢的延時大概10ms虏冻。
topology 的分布式查詢實現(xiàn)如下:
topology.newDRPCStream("words")
.each(new Fields("args"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
.each(new Fields("count"), new FilterNull())
.aggregate(new Fields("count"), new Sum(), new Fields("sum"));