本文內(nèi)容部分來自Trident Tutorial。
Trident是基于Storm的實時計算模型的高級抽象听诸。它可以實現(xiàn)高吞吐(每秒數(shù)百萬條消息)的有狀態(tài)流處理和低延遲分布式查詢。如果以前使用過高級批處理工具(比如Pig或Cascading),則對Trident的概念會非常熟悉,比如連接枢纠、聚合、分組黎棠、功能處理和過濾等晋渺。除此之外,Trident還增加了用于在數(shù)據(jù)庫或持久化存儲上進行有狀態(tài)的增量處理的原語脓斩。Trident具有一致性木西、一次性語義,所以很容易就能夠推導(dǎo)出Trident拓撲結(jié)構(gòu)随静。
Trident的出現(xiàn)算是程序猿非常懶的又一個鐵證八千。Strom是一個實時流處理工具,有很高的吞吐燎猛。在實際應(yīng)用場景中恋捆,很多場景是借助這種實時處理能力,對實時數(shù)據(jù)進行統(tǒng)計重绷,然后將統(tǒng)計結(jié)果實時推送到大屏或者其他可以實時瀏覽的地方沸停,這樣領(lǐng)導(dǎo)或者活動運營就可以實時查看銷售或活動情況,比如论寨,雙十一時候的大屏星立,就可以使用Storm來做(我們現(xiàn)在就是這樣做的爽茴,把全渠道的銷售情況進行實時統(tǒng)計葬凳,然后顯示在大屏上绰垂,據(jù)說領(lǐng)導(dǎo)會看)。然后火焰,程序猿們就發(fā)現(xiàn)劲装,很多統(tǒng)計功能非常類似,所以進行抽象昌简,使用更加高級的功能代替一個一個的Spout占业、Bolt(當然,Trident拓撲結(jié)構(gòu)運行的時候也是解析成Spout和Bolt運行)纯赎。
然后又有人發(fā)現(xiàn)谦疾,Trident這種方式也是比較麻煩,即使程序猿們通過高級抽先的Trident省去了很多麻煩犬金,但是還是架不住運維念恍、運營、產(chǎn)品等不斷改變的需求晚顷,所以就有很多SQL方式解析為Trident或普通Topology的工具產(chǎn)生峰伙。既然運維、運營该默、產(chǎn)品等不斷修改需求瞳氓,那就簡單的通過SQL查詢(不同的SQL解析為不同的拓撲結(jié)構(gòu),在Storm中運行栓袖,可以得出不同的結(jié)果)匣摘。比如:squall。
這些都是題外話裹刮,下面繼續(xù)說Trident音榜。
1 一個例子
接下來看一個Trident的例子:
- 統(tǒng)計輸入句子中單詞數(shù)量
- 實現(xiàn)單詞統(tǒng)計結(jié)果的查詢
首先實現(xiàn)一個不斷發(fā)送句子的Spout:
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
new Values("the cow jumped over the moon"),
new Values("the man went to the store and bought some candy"),
new Values("four score and seven years ago"),
new Values("how many apples can you eat"));
spout.setCycle(true);
上面的Spout循環(huán)發(fā)送句子流,下面是計算流中單詞數(shù)量的代碼必指,也就是Trident的本體:
TridentTopology topology = new TridentTopology(); // 1
TridentState wordCounts =
topology.newStream("spout1", spout) // 2
.each(new Fields("sentence"), new Split(), new Fields("word")) // 3
.groupBy(new Fields("word")) // 4
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")) // 5
.parallelismHint(6);
一步步說下代碼:
行1囊咏,創(chuàng)建TridentTopology
對象topology,這個對象就是Trident計算的入口塔橡。
行2梅割,TridentTopology
的newStream
方法是定義Trident的輸入源,這里使用的是一開始定義的FixedBatchSpout
對象葛家。當然户辞,輸入源也可以是Kafka之類的隊列Broker(這個實在不知道應(yīng)該翻譯成什么,不過好在是說Broker癞谒,大家就都明白是干啥的)底燎。這個地方刃榨,Trident會在Zookeeper中跟蹤每個輸入源的狀態(tài)元數(shù)據(jù)。也就是Trident中比較重要的狀態(tài)的概念双仍,這個后面再說枢希。Trident會將輸入的流分成更小的批量數(shù)據(jù)(這里比較繞口,可以理解為Trident的入口進來一個大的批量數(shù)據(jù)朱沃,然后Trident把這個大的批量數(shù)據(jù)進行分割苞轿,變成一堆小的批量數(shù)據(jù),比如進來的是1000條逗物,分割成10個100條)進行處理搬卒,比如,把傳入的流分成下面的樣子:
通常翎卓,小批量數(shù)據(jù)的數(shù)量會是數(shù)千或數(shù)百萬個契邀,這取決于吞吐量。
Trident提供了一整套完整的批量處理API來處理這些小批量數(shù)據(jù)失暴。類似于Hadoop的高級抽象中處理Pig或Cascading的內(nèi)容:分組坯门、連接、聚合锐帜、功能操作田盈、過濾等。當然缴阎,分別處理每個小的批量數(shù)據(jù)并不容易(使用Hadoop處理大的矩陣乘法的就會深有體會)允瞧,所以Trident提供了跨批次進行聚合處理的功能,并可以將這些聚合結(jié)果持久化在內(nèi)存中蛮拔、Memcached述暂、Cassandra或其他存儲中。Trident還具有一流的實時狀態(tài)查詢功能建炫,這個狀態(tài)可以有Trident更新畦韭,或者其他獨立的狀態(tài)來源。
行3肛跌,Spout發(fā)出包含名為sentence的field的流艺配。通過使用Split
函數(shù),對每個tuple進行處理衍慎,把名為sentence的字段分割為一個個單詞转唉。將分割的單詞命名為word,繼續(xù)向下分發(fā)稳捆。下面是Split
的定義:
public class Split extends BaseFunction {
public void execute(TridentTuple tuple, TridentCollector collector) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
collector.emit(new Values(word));
}
}
}
行4赠法,Trident進行了單詞計數(shù)和結(jié)果的持續(xù)存儲。先對word字段進行分組乔夯,然后用Count
聚合器持續(xù)聚合砖织。
Trident的一個很強的特性是能夠完全容錯和一次性處理語義款侵。Trident能夠保持狀態(tài),如果發(fā)生故障需要重試侧纯,則不會對同一源數(shù)據(jù)多次更新數(shù)據(jù)新锈。
行5,persistentAggregate
函數(shù)實現(xiàn)了存儲和更新聚合結(jié)果的功能茂蚓,不需要操心壕鹉。例子中剃幌,計數(shù)結(jié)果保存在內(nèi)存中聋涨,當然也可以使用Memcached、Cassandra或其他持久化存儲负乡。這里先不做討論牍白。persistentAggregate
方法將Stream
轉(zhuǎn)換為TridentState
對象。在這里抖棘,TridentState
對象表示所有單詞計數(shù)茂腥,然后使用TridentState
對象來實現(xiàn)分布式查詢。
接下來實現(xiàn)對單詞計數(shù)實現(xiàn)低延遲分布式查詢切省,以空格分割的單詞列表作為輸入最岗,返回這些單詞的計數(shù)總和。這個查詢會在后臺做并行化處理朝捆,其他的與普通的RPC調(diào)用一樣般渡。比如像下面這樣調(diào)用:
DRPCClient client = new DRPCClient("drpc.server.location", 3772);
System.out.println(client.execute("words", "cat dog the man");
// prints the JSON-encoded result, e.g.: "[[5078]]"
如上面的代碼,除了它是在Storm集群中并行執(zhí)行外芙盘,與普通的RPC調(diào)用沒什么區(qū)別驯用。通常簡單的RPC查詢,延遲在10ms左右儒老,復(fù)雜的DRPC查詢可能需要更長的時間蝴乔,具體的時間取決于計算被分配的資源多少。
拓撲中分布式查詢部分的實現(xiàn)如下所示:
topology.newDRPCStream("words")
.each(new Fields("args"), new Split(), new Fields("word")) // 6
.groupBy(new Fields("word")) // 7
.stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")) // 8
.each(new Fields("count"), new FilterNull()) // 9
.aggregate(new Fields("count"), new Sum(), new Fields("sum")); // 10
使用同一個TridentTopology
對象來創(chuàng)建DRPC流驮樊,命名該函數(shù)為words薇正,函數(shù)名與DRPCClient
執(zhí)行時的第一個參數(shù)名稱相同。每個DRPC請求都是一個小的批量作業(yè)囚衔,將請求的單個tuple作為輸入挖腰,tuple中包含名為args的字段,args中包含了客戶端的請求參數(shù)佳魔。在這個例子中曙聂,請求參數(shù)就是“cat dog the man”。
行6鞠鲜,通過Split
函數(shù)將請求參數(shù)分解為一個個單詞宁脊,組成“word”流断国。
行7,將上一步分解的“word”流分組榆苞。
行8稳衬,stateQuery
方法用于查詢第一部分生成的TridentState
對象。MapGet
將被執(zhí)行坐漏,根據(jù)輸入的單詞薄疚,查詢單詞的數(shù)量。因為DRPC流的分組方式與TridentState
分組方式相同(都是通過word字段)赊琳,所以每個單詞查詢會被自動路由到該單詞的TridentState
對象的分區(qū)街夭。
行9,通過FilterNull
函數(shù)過濾掉沒有計數(shù)結(jié)果的單詞躏筏。
行10板丽,使用Sum
函數(shù)對存在計數(shù)結(jié)果的進行加和,然后通過Trident
自動將結(jié)果返回客戶端趁尼。
Trident
在如何以最大限度的提高性能來執(zhí)行拓撲方面是非常智能的埃碱。比如,它會進行下面兩個提高性能的自動化操作:
- 對狀態(tài)的讀取或?qū)懭氩僮鳎ㄈ?code>persistentAggregate和
stateQuery
)酥泞,會自動的進行批處理砚殿。比如,如果需要在當前批處理操作中執(zhí)行20次更新操作芝囤,Trident
會自動批量的讀取或?qū)懭胨蒲祝瑑H執(zhí)行一次讀請求或?qū)懻埱螅皇?0次凡人。 -
Trident
聚合操作進行了大量優(yōu)化名党。Trident
會將同一個組中的所有tuple發(fā)送到同一臺機器,進行部分聚合操作挠轴,然后再通過網(wǎng)絡(luò)發(fā)送tuple传睹。比如,Count
聚合操作會先計算每個分區(qū)的數(shù)量岸晦,然后將這些統(tǒng)計結(jié)果通過網(wǎng)絡(luò)傳輸?shù)揭黄鹋菲。俑鶕?jù)這些初始統(tǒng)計結(jié)果計算最后的結(jié)果。
2 再來個例子
下面的例子是一個純DRPC拓撲启上,用于計算URL的覆蓋范圍邢隧,就是在Twitter上發(fā)布的URL影響的范圍。要計算這個數(shù)據(jù)冈在,需要先獲取所有推送URL的人倒慧,然后獲取所有這些人的粉絲,去重,計算總數(shù)纫谅。這種計算需要消耗非常多的資源炫贤,可能需要數(shù)千次數(shù)據(jù)庫查詢操作和數(shù)千萬的tuple(當然,這是針對Twitter這種應(yīng)用體量來說的付秕。如果用戶只有幾個兰珍,一個SQL估計就出結(jié)果了)。所以就需要Storm和Trident這種可以并行化跨集群的計算询吴。
下面這個拓撲會從兩個狀態(tài)讀取數(shù)據(jù):一個是將URL與分享過該URL的人做的映射掠河,另一個是將一個人與這個人的粉絲做的映射。查詢拓撲如下:
TridentState urlToTweeters = topology.newStaticState(getUrlToTweetersState());
TridentState tweetersToFollowers = topology.newStaticState(getTweeterToFollowersState());
topology.newDRPCStream("reach")
.stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields("tweeters")) // 1
.each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter")) // 2
.shuffle()
.stateQuery(tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers")) // 3
.parallelismHint(200)
.each(new Fields("followers"), new ExpandList(), new Fields("follower")) // 4
.groupBy(new Fields("follower")) // 5
.aggregate(new One(), new Fields("one")) // 6
.parallelismHint(20)
.aggregate(new Count(), new Fields("reach"));
該拓撲使用newStaticState
方法創(chuàng)建從外部數(shù)據(jù)庫獲取的TridentState
對象猛计,然后在拓撲中進行查詢唠摹。與其他類似,這些數(shù)據(jù)庫查詢可以自動批量化有滑,以求效率最大化跃闹。下面分解來說:
行1,將前面從urlToTweeters數(shù)據(jù)庫讀取的數(shù)據(jù)作為輸入毛好,通過MapGet
函數(shù)進行處理。
行2苛秕,通過ExpandList
函數(shù)將每個人分解為不同的tuple數(shù)據(jù)肌访。
行3,將前面從tweeterToFollowers數(shù)據(jù)庫讀取的數(shù)據(jù)作為輸入艇劫,與行2中的數(shù)據(jù)吼驶,通過MapGet
函數(shù)進行處理,查詢每個人的粉絲列表店煞。這里最重要的就是并行話蟹演,所以需要shuffle
來將所有人均勻分布到所有的workder
上∏牦埃可以看到在下面的parallelismHint
中酒请,并行數(shù)是200,說明該操作并行程度非常高鸣个,占據(jù)了計算的大部分資源羞反。
行4、行5囤萤、行6昼窗,這里是對粉絲進行單獨統(tǒng)計和計數(shù)。首先是粉絲列表數(shù)據(jù)通過ExpandList
函數(shù)進行分解為不同tuple數(shù)據(jù)涛舍,然后將粉絲數(shù)據(jù)進行分組澄惊,在對粉絲數(shù)據(jù)進行One
聚合。這里也是通過parallelismHint
進行并行計算。
One
聚合器定義如下:
public class One implements CombinerAggregator<Integer> {
public Integer init(TridentTuple tuple) {
return 1;
}
public Integer combine(Integer val1, Integer val2) {
return 1;
}
public Integer zero() {
return 1;
}
}
這是一個combiner
聚合器掸驱,它能夠先進行部分聚合窘哈,然后通過網(wǎng)絡(luò)傳輸tuple進行最后的聚合,以最大限度地提高效率亭敢。
接下來說說Trident的數(shù)據(jù)結(jié)構(gòu)滚婉。
3 字段和tuple數(shù)據(jù)
Trident中用來傳輸?shù)臄?shù)據(jù)模型名為TridentTuple
,是一個命名的值列表帅刀。在一個拓撲中让腹,tuple通過一系列的操作逐步創(chuàng)建。這些操作就是將輸入數(shù)據(jù)進行處理扣溺,然后返回輸出數(shù)據(jù)骇窍。
比如,有一個名為“stream”的流锥余,它包含字段“x”腹纳,“y”和“z”。 要運行一個以“y”為輸入的過濾器MyFilter驱犹,可以這樣做:
stream.each(new Fields("y"), new MyFilter())
MyFilter
定義如下:
public class MyFilter extends BaseFilter {
public boolean isKeep(TridentTuple tuple) {
return tuple.getInteger(0) < 10;
}
}
這里保留了“y”字段小于10的所有tuple數(shù)據(jù)嘲恍。MyFilter
輸入的TridentTuple
只有“y”字段。Trident可以高效的選擇一組tuple作為輸入雄驹,這種選擇是0消耗的佃牛。
接下來看看function fields
是如何工作的,不如下面這個函數(shù):
public class AddAndMultiply extends BaseFunction {
public void execute(TridentTuple tuple, TridentCollector collector) {
int i1 = tuple.getInteger(0);
int i2 = tuple.getInteger(1);
collector.emit(new Values(i1 + i2, i1 * i2));
}
}
這個操作是將兩個數(shù)字作為數(shù)據(jù)医舆,然后發(fā)出兩個新值:兩數(shù)字的和與積俘侠。比如有“x”,“y”和“z”三個字段的流蔬将,可以如下操作:
stream.each(new Fields("x", "y"), new AddAndMultiply(), new Fields("added", "multiplied"));
function
函數(shù)操作的輸出結(jié)果是追加在輸入tuple中的爷速,因此經(jīng)過AddAndMultiply
操作之后輸出tuple包含五個字段:“x”,“y”霞怀,“z”惫东,“added”和“multipl”±锓常“added”對應(yīng)的是AddAndMultiply
發(fā)出的第一個值凿蒜,"multiplied"對應(yīng)的是第二個值。
aggregator
聚合操作是替換tuple數(shù)據(jù)胁黑,比如废封,有一個包含“val1”和“val2”的流:
stream.aggregate(new Fields("val2"), new Sum(), new Fields("sum"))
輸入結(jié)果將只包含“sum”一個字段的tuple,表示“val2”的和丧蘸。
groupBy
分組操作漂洋,輸出結(jié)果將包含分組字段和聚合器發(fā)出的字段遥皂,比如:
stream.groupBy(new Fields("val1"))
.aggregate(new Fields("val2"), new Sum(), new Fields("sum"))
例子中,輸出結(jié)果包含“val2”和“sum”兩個字段刽漂。
4 狀態(tài)
實時計算解決問題的一個關(guān)鍵是如何管理狀態(tài)演训,以便在失敗或重試時能夠?qū)崿F(xiàn)冪等性。消除實時計算過程中的故障是不可能的贝咙,所以样悟,當一個節(jié)點死亡或其他問題出現(xiàn)時,就需要重試庭猩。問題是窟她,如何更新狀態(tài)(通過外部存儲或拓撲內(nèi)部狀態(tài)),以便每個消息只處理一次蔼水。
這是一個棘手的問題震糖,有一下幾種情況。假設(shè)趴腋,如果要做一個計數(shù)的聚合操作吊说,然后將技術(shù)存儲在數(shù)據(jù)庫中。如果只是將結(jié)果存儲优炬,當需要更新結(jié)果的時候颁井,無法知道這個結(jié)果是否是新結(jié)果還是重試結(jié)果。該結(jié)果可能是之前嘗試過更新過的穿剖,已經(jīng)成功更新了數(shù)據(jù)庫蚤蔓,但是后續(xù)步驟中失敗了。也可能是嘗試更新數(shù)據(jù)庫的時候糊余,更新數(shù)據(jù)庫失敗了。
Trident通過下面的方式解決這個問題:
- 每個批次的數(shù)據(jù)被賦予唯一的ID单寂,稱為“transaction id”贬芥。如果這批數(shù)據(jù)重新計算,會攜帶相同的ID宣决。
- 批次將狀態(tài)隔離蘸劈。在批次2狀態(tài)更新之前,不會更新批次3的狀態(tài)尊沸。
更加這兩個原子操作威沫,可以通過狀態(tài)更新完成一次語義操作。需要將事務(wù)ID與計數(shù)結(jié)果一起存儲洼专,作為原子值婉弹。然后搂抒,當更新操作時,通過數(shù)據(jù)庫中的事務(wù)ID與當前批次的事務(wù)ID進行比較。如果相同肠虽,說明是同一批次的數(shù)據(jù)旁舰,就跳過更新。如果不同,就增加計數(shù)恤筛。
當然,不需要在拓撲中手動執(zhí)行這個邏輯芹橡,這個邏輯包裝在State中并自動完成毒坛。而且,State 狀態(tài)對象也不是實現(xiàn)事務(wù)ID所必需的林说,如果不想在數(shù)據(jù)庫中存儲事務(wù)ID煎殷,也可以不存儲。這樣的話述么,State 在失敗的情況下也會至少執(zhí)行一次(可能會更好)蝌数。可以在這里學(xué)習(xí)更多的關(guān)于State狀態(tài)的內(nèi)容度秘。
State 狀態(tài)可以存儲在任何地方顶伞,外部存儲,內(nèi)部狀態(tài)等剑梳。如果想使用一個內(nèi)存狀態(tài)實現(xiàn)唆貌,保留幾個小時的數(shù)據(jù)可用,然后就將其丟棄垢乙,可以看這里锨咙。
5 執(zhí)行拓撲
Trident拓撲結(jié)果最后會編譯成高效的Strom拓撲。當需要重新分區(qū)數(shù)據(jù)時追逮,比如groupBy
或shuffle
酪刀,tuple就只能通過網(wǎng)絡(luò)傳輸。所以如果Trident拓撲結(jié)構(gòu)如下:
最后會被編譯成Strom spout/bolt的結(jié)構(gòu)钮孵,如下:
6 結(jié)論
Trident 使實時計算更加優(yōu)雅骂倘。可以通過Trident的API實現(xiàn)高吞吐的流處理巴席、狀態(tài)處理历涝、低延遲查詢等功能。而且Trident中做了很多優(yōu)化漾唉,可以獲取最大性能荧库。
個人主頁: http://www.howardliu.cn
個人博文: storm筆記:Trident應(yīng)用
CSDN主頁: http://blog.csdn.net/liuxinghao
CSDN博文: storm筆記:Trident應(yīng)用