在storm筆記:Trident應(yīng)用中說了下Trident的使用弱判,這里說下Trident幾種狀態(tài)的變化及其對應(yīng)API的使用。
本文內(nèi)容來自Trident State锥惋,部分內(nèi)容根據(jù)實際情況做出修改昌腰。
Trident中有對狀態(tài)數(shù)據(jù)進(jìn)行讀取和寫入操作的一流抽象工具。狀態(tài)既可以保存在拓?fù)鋬?nèi)部膀跌,比如保存在內(nèi)容中并由HDFS存儲遭商,也可以通過外部存儲(比如Memcached或Cassandra)存儲在數(shù)據(jù)庫中。而對于Trident的API而言捅伤,這兩種機制沒有任何區(qū)別劫流。
Trident以容錯的方式管理狀態(tài),以便在重試或失敗時的狀態(tài)更新是冪等的。在大數(shù)據(jù)處理中祠汇,數(shù)據(jù)處理的冪等性是非常重要的一個指標(biāo)仍秤,這樣能夠保證每個消息即使處理了多次,結(jié)果也像是只處理了一次一樣座哩。
在進(jìn)行狀態(tài)更新時可能需要各種級別的容錯能力徒扶,在這之前,我們來看一個例子說明實現(xiàn)“恰好一次”語義所需的技巧根穷。比如谓着,正在對流中的數(shù)據(jù)進(jìn)行計數(shù)聚合操作,每次處理新的元組時滔金,都會將運行的計數(shù)結(jié)果存儲在數(shù)據(jù)庫中霉涨。
如果發(fā)生故障時,元組將重新執(zhí)行計數(shù)操作尘惧。這就會在執(zhí)行狀態(tài)更新時出現(xiàn)問題康栈,因為這個時候不知道是不是已經(jīng)更新過該元組狀態(tài)。也許還沒有處理該元組數(shù)據(jù)喷橙,這個時候就需要增加計數(shù)啥么。也許已經(jīng)處理該元組,并成功增加計數(shù)贰逾,但是在下一步的時候出現(xiàn)問題悬荣,這種情況下,就不應(yīng)該增加計數(shù)疙剑。也有可能是處理元組正常氯迂,更新計數(shù)是異常,這個時候就需要更新計數(shù)言缤。
所以說嚼蚀,如果只是在數(shù)據(jù)庫中存儲計數(shù)信息,就不知道元組是否已經(jīng)處理過管挟。因此轿曙,就需要更多的信息作為輔助。Trident提供了下面三個性質(zhì)僻孝,來實現(xiàn)“恰好一次”的處理:
- 元組都是以小批次處理
- 每批元組都會給出一個唯一ID拳芙,稱為事務(wù)ID(transaction id,txid)皮璧。如果批次重復(fù)處理舟扎,txid也會相同。
- 狀態(tài)的更新操作是按照元組批次的順序執(zhí)行的悴务。也就是說睹限,在批次2狀態(tài)更新成功之前譬猫,不會進(jìn)行批次3的狀態(tài)更新。
根據(jù)這些特性羡疗,就可以通過檢查到該元組的批次是否已被處理染服,并根據(jù)檢測結(jié)果采取適當(dāng)?shù)牟僮鞲聽顟B(tài)了。采取的具體操作取決于Spout的類型叨恨。Spout有三種類型:“非事務(wù)型(non-transactional)”柳刮,“事務(wù)型(transactional)”和“不透明事務(wù)型(opaque transactional)”。對應(yīng)的容錯能力也是三種:“非事務(wù)”痒钝,“事務(wù)”和“不透明事務(wù)”秉颗。下面來看看Spout的各個類型及對應(yīng)的容錯能力。
事務(wù)型Spout
Trident是按照批次發(fā)送元組進(jìn)行處理的送矩,每個批次的元組被賦予唯一的事務(wù)ID蚕甥。Spout的特性根據(jù)他們所提供容錯性保證機制來決定的,而且這種機制也會對每個批次發(fā)生作用栋荸。事務(wù)型Spout有如下特性:
- 每個批次的txid不變菇怀,對于一個特定的txid,重復(fù)執(zhí)行時晌块,它所包含的元組數(shù)據(jù)與第一次完全相同爱沟。
- 元組只會在一個批次出現(xiàn),不會重復(fù)(某個元組只會出現(xiàn)在一個批次中匆背,不會出現(xiàn)在多個批次中)呼伸。
- 每個元組都會出現(xiàn)一次(不會遺漏任何的元組數(shù)據(jù))
這是最簡單最容易理解的一種Spout類型,數(shù)據(jù)流被分割成固定的批次靠汁。storm中有與Kafka集成的事務(wù)型Spout的擴展蜂大,代碼在這里闽铐。
既然事務(wù)型Spout這么簡單易懂蝶怔,為什么不在Trident中完全使用事務(wù)型Spout呢?其實就在于它的容錯能力兄墅。比如踢星,TransactionalTridentKafkaSpout
的工作方式是,同一個txid的批次中將包含kafka所有分區(qū)的元組隙咸。一旦某個批次發(fā)出后沐悦,出現(xiàn)異常,需要重新發(fā)出五督,就需要完全相同的元組集合才能滿足事務(wù)型Spout要求的語義藏否。但是這個時候,kafka某個節(jié)點異常(節(jié)點關(guān)閉或分區(qū)不可用)充包,就無法獲取完全相同的的一批元組副签,那整個拓?fù)渚蜁?yīng)為第3條語義(批次按順序執(zhí)行)停止遥椿。
這就是要有“不透明事務(wù)型”Spout的原因了,它能夠容忍數(shù)據(jù)源節(jié)點丟失淆储,而且又能保證數(shù)據(jù)恰好被操作一次冠场。
注:對kafka比較熟悉的應(yīng)該會想到,如果某一個topic支持復(fù)制本砰,那即使一個節(jié)點不可用碴裙,還會有其他復(fù)制節(jié)點頂上,那TransactionalTridentKafkaSpout也能夠避免上面的問題点额。
下面繼續(xù)看看如何設(shè)計一個支持恰好一次特性的“事務(wù)型”Spout語義(簡單的說就是同一個txid對應(yīng)的批次元組數(shù)據(jù)完全一致)的狀態(tài)實現(xiàn)舔株,這種狀態(tài)稱為“事務(wù)型狀態(tài)”。
比如咖楣,現(xiàn)在有一個單詞計數(shù)的拓?fù)涠桨剩枰獙卧~計數(shù)存儲在key/value數(shù)據(jù)庫中。key是單詞诱贿,value中包含單詞數(shù)量娃肿。另外,為了確定同一批次元組是否已經(jīng)被執(zhí)行珠十,需要將txid也一同存儲在value中料扰。這樣,當(dāng)需要更新單詞數(shù)量的時候焙蹭,先比較txid是否相同晒杈,如果相同,就跳過更新孔厉。如果不同拯钻,就更新計數(shù)。
考慮這個為什么它工作的例子撰豺。 假設(shè)您正在處理由以下批次元組組成的txid 3:
比如粪般,要處理一個txid是3的一批元組:
["man"]
["man"]
["dog"]
目前數(shù)據(jù)庫中存儲的數(shù)據(jù)為:
man => [count=3, txid=1]
dog => [count=4, txid=3]
apple => [count=10, txid=2]
在這個時候,發(fā)現(xiàn)“man”對應(yīng)的txid是1污桦,當(dāng)前的txid是3亩歹,就可以更新了。然后“dog”對應(yīng)的txid是3凡橱,說明同一批次的元組數(shù)據(jù)已經(jīng)發(fā)送過了小作,就不需要更新。從這點可以看出稼钩,txid是3的批次元組是重復(fù)發(fā)送的顾稀,在更新“dog”數(shù)量后,在更新“man”數(shù)量前坝撑,出現(xiàn)了錯誤静秆。最后的結(jié)果就是:
man => [count=5, txid=3]
dog => [count=4, txid=3]
apple => [count=10, txid=2]
不透明事務(wù)型Spout
前面已經(jīng)提過氮块,不透明事務(wù)型Spout不能保證相同txid對應(yīng)的批次中的元組數(shù)據(jù)完全一致。其特點如下:
- 每個元組都會在有且僅有一個批次中處理成功诡宗。
[OpaqueTridentKafkaSpout](http://github.com/apache/storm/tree/v1.1.0/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java)
具有這種特性滔蝉,同時對kafka節(jié)點異常有很好的容錯性。OpaqueTridentKafkaSpout
在發(fā)送一個批次元組的時候塔沃,會從上次成功之后的位置開始發(fā)送蝠引,這樣就能夠保證元組不會漏發(fā)或重發(fā)。
基于上面的特點蛀柴,不透明事務(wù)型Spout就不同通過txid來直接判斷是否可以跳過狀態(tài)更新螃概,因為具有相同txid的批次中元組可能發(fā)生了變化。
這就需要存儲更多的狀態(tài)信息了鸽疾,而不僅僅是一個結(jié)果和一個txid了吊洼,還需要存儲前一個結(jié)果值。
比如制肮,當(dāng)前批次的計數(shù)是2冒窍,需要進(jìn)行一次狀態(tài)更新,數(shù)據(jù)庫中的數(shù)據(jù)如下:
{
"value": 4,
"prevValue": 1,
"txid": 2
}
如果當(dāng)前的txid是3豺鼻,與數(shù)據(jù)庫中的不同综液。在這種情況下,需要將prevValue
的值該為value
的值儒飒,value
的值增加2谬莹,更新txid
為3,最后的結(jié)果就是:
{
"value": 6,
"prevValue": 4,
"txid": 3
}
如果當(dāng)前的txid是2桩了,等于數(shù)據(jù)庫中的txid附帽。因為txid相同,說明上一次txid為2的批次處理失敗井誉,但是本次的元組可能與上一次不同了蕉扮。這個時候,就需要使用本次數(shù)據(jù)覆蓋上次處理結(jié)果送悔。也就是說慢显,prevValue
值不變爪模,value
的值改為prevValue
加2欠啤,txid
不變,最后的結(jié)果如下:
{
"value": 3,
"prevValue": 1,
"txid": 2
}
這種方式的可行性依賴于Trident的強順序性屋灌。也就是說洁段,一旦開始處理一個新的批次,就不會重復(fù)執(zhí)行上一個批次共郭。不透明事務(wù)型Spout保證了不同批次之間沒有重復(fù)的情況祠丝,也就是每個元組只會在一個批次中處理成功疾呻,所以就可以放心的使用前一個值與當(dāng)前值覆蓋已存數(shù)據(jù)了。
非事務(wù)型Spout
非事務(wù)型Spout不能為批次提供任何保證写半。所以可能出現(xiàn)"至多一次"的處理岸蜗,即在某個批次處理過程中失敗了,但是不會在重新處理叠蝇;也可能提供“至少一次”的處理璃岳,即可能會有多個批次分別處理某個元組。也就是沒有辦法實現(xiàn)“恰好一次”的語義悔捶。
不同類型spout和狀態(tài)總結(jié)
下面是不同的spout/狀態(tài)組合是否支持“恰好一次”處理語義:
不透明事務(wù)狀態(tài)有最強的容錯性铃慷,但是因為存儲txid和兩個結(jié)果帶來更大的開銷。事務(wù)型狀態(tài)只需要存儲一個狀態(tài)結(jié)果蜕该,但是只對事務(wù)型Spout有效犁柜。非事務(wù)型狀態(tài)要求存儲的數(shù)據(jù)更少,但是不能實現(xiàn)“恰好一次”的處理語義堂淡。
所以在選擇容錯與存儲空間中馋缅,需要根據(jù)具體的需要選擇合適的組合。
狀態(tài)API
根據(jù)前面來看绢淀,“恰好一次”語義的原理有些復(fù)雜股囊,但是作為用戶,并不需要了解這些txid對比更啄、多值存儲稚疹,因為Trident已經(jīng)在State中封裝了所有容錯處理邏輯,只需要想下面著用攜帶碼就行:
TridentTopology topology = new TridentTopology();
TridentState wordCounts =
topology.newStream("spout1", spout)
.each(new Fields("sentence"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.persistentAggregate(MemcachedState.opaque(serverLocations), new Count(), new Fields("count"))
.parallelismHint(6);
所有的不透明事務(wù)狀態(tài)邏輯已經(jīng)封裝在MemcachedState.opaque
中祭务,另外内狗,狀態(tài)更新會自動調(diào)整為批次操作,這樣可以減少與數(shù)據(jù)庫之間反復(fù)交互帶來的資源浪費义锥。
基本的State
接口只有兩個方法:
public interface State {
void beginCommit(Long txid); // 對于像DRPC流發(fā)生的partitionPersist這樣的事情柳沙,可以是null
void commit(Long txid);
}
前面已經(jīng)說過,狀態(tài)更新開始和結(jié)束時都會獲取txid拌倍。Trident并不關(guān)心狀態(tài)如何操作赂鲤,使用哪種方式更新,使用哪種方式讀取柱恤。
假如有一個包含用戶地址信息的定制數(shù)據(jù)庫数初,需要使用Trident與數(shù)據(jù)庫交互,State
擴展類中包含對于用戶信息的getter和setter方法:
public class LocationDB implements State {
public void beginCommit(Long txid) {
}
public void commit(Long txid) {
}
public void setLocation(long userId, String location) {
// 向數(shù)據(jù)庫設(shè)置地址信息
}
public String getLocation(long userId) {
// 從數(shù)據(jù)庫中獲取地址信息
}
}
然后就需要一個StateFactory
來創(chuàng)建Trident所需的State
對象梗顺,LocationDB
所需的StateFactory
大體結(jié)構(gòu)如下:
public class LocationDBFactory implements StateFactory {
public State makeState(Map conf, int partitionIndex, int numPartitions) {
return new LocationDB();
}
}
Trident提供了用于查詢狀態(tài)源的QueryFunction
接口泡孩,以及更新狀態(tài)源的StateUpdater
接口。比如寺谤,查詢LocationDB
中用戶信息的QueryLocation
:
TridentTopology topology = new TridentTopology();
TridentState locations = topology.newStaticState(new LocationDBFactory());
topology.newStream("myspout", spout)
.stateQuery(locations, new Fields("userid"), new QueryLocation(), new Fields("location"));
QueryLocation
的代碼如下:
public class QueryLocation extends BaseQueryFunction<LocationDB, String> {
public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) {
List<String> ret = new ArrayList();
for (TridentTuple input : inputs) {
ret.add(state.getLocation(input.getLong(0)));
}
return ret;
}
public void execute(TridentTuple tuple, String location, TridentCollector collector) {
collector.emit(new Values(location));
}
}
QueryFunction
操作分為兩步:首先仑鸥,Trident會將收集到的數(shù)據(jù)放在一個批次中吮播,發(fā)送給batchRetrieve
方法。在這個例子中眼俊,batchRetrieve
方法收到的是一些用戶id意狠。batchRetrieve
會返回一組與輸入元組長度相同的結(jié)果。輸入元組與輸出結(jié)果中各個元素是彼此對應(yīng)的疮胖。
從這點來看摄职,上面的LocationDB
類并沒有發(fā)揮Trident批處理優(yōu)勢,所以需要盡心改造:
public class LocationDB implements State {
public void beginCommit(Long txid) {
}
public void commit(Long txid) {
}
public void setLocationsBulk(List<Long> userIds, List<String> locations) {
// set locations in bulk
}
public List<String> bulkGetLocations(List<Long> userIds) {
// get locations in bulk
}
}
對應(yīng)的QueryLocation
類如下:
public class QueryLocation extends BaseQueryFunction<LocationDB, String> {
public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) {
List<Long> userIds = new ArrayList<Long>();
for (TridentTuple input : inputs) {
userIds.add(input.getLong(0));
}
return state.bulkGetLocations(userIds);
}
public void execute(TridentTuple tuple, String location, TridentCollector collector) {
collector.emit(new Values(location));
}
}
這段代碼大幅減少了數(shù)據(jù)庫操作获列。
對于更新狀態(tài)谷市,可以使用StateUpdater
接口。比如下面的更新操作:
public class LocationUpdater extends BaseStateUpdater<LocationDB> {
public void updateState(LocationDB state, List<TridentTuple> tuples, TridentCollector collector) {
List<Long> ids = new ArrayList<Long>();
List<String> locations = new ArrayList<String>();
for (TridentTuple t : tuples) {
ids.add(t.getLong(0));
locations.add(t.getString(1));
}
state.setLocationsBulk(ids, locations);
}
}
對應(yīng)的更新操作拓?fù)渲芯涂梢允沁@樣:
TridentTopology topology = new TridentTopology();
TridentState locations =
topology.newStream("locations", locationsSpout)
.partitionPersist(new LocationDBFactory(), new Fields("userid", "location"), new LocationUpdater());
partitionPersist
方法會更新狀態(tài)击孩,StateUpdater
接口接收一批元組和狀態(tài)信息迫悠,然后更新狀態(tài)。上面的LocationUpdater
類中僅僅是從元組中抓取用戶id和地址信息巩梢,然后對狀態(tài)執(zhí)行批量處理创泄。然后,partitionPersist
會返回一個表示更新狀態(tài)后的TridentState
對象括蝠。隨后就可以在拓?fù)涞钠渌胤绞褂?code>stateQuery方法查詢狀態(tài)鞠抑。
在StateUpdater
的updateState
方法中有一個TridentCollector
參數(shù),這個對象是可以將發(fā)送進(jìn)來的元組發(fā)送到一個新的數(shù)據(jù)流中忌警。在這個例子中沒有用到搁拙。如果需要進(jìn)行比如向數(shù)據(jù)庫更新計數(shù)值的后續(xù)操作,可以通過TridentState#newValuesStream
方法獲取新的數(shù)據(jù)流數(shù)據(jù)法绵。
persistentAggregate
Trident使用一個名為persistentAggregate
的方法更新狀態(tài)箕速。前面已經(jīng)出現(xiàn)過,這里再寫一遍:
TridentTopology topology = new TridentTopology();
TridentState wordCounts =
topology.newStream("spout1", spout)
.each(new Fields("sentence"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));
partitionPersist
是一個接收Trident聚合器作為參數(shù)并對狀態(tài)數(shù)據(jù)進(jìn)行更新的方法朋譬,persistentAggregate
就是構(gòu)建于partitionPersist
上層的一個編程抽象盐茎。在這個例子中,通過groupBy
返回一個分組數(shù)據(jù)徙赢,Trident需要一個實現(xiàn)MapState
接口的對象字柠。分組字段是狀態(tài)的key,聚合結(jié)果是狀態(tài)的value狡赐。MapState
接口如下:
public interface MapState<T> extends State {
List<T> multiGet(List<List<Object>> keys);
List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters);
void multiPut(List<List<Object>> keys, List<T> vals);
}
如果你需要在未分組的數(shù)據(jù)流上執(zhí)行聚合操作時窑业,Trident需要一個實現(xiàn)Snapshottable
接口的對象:
public interface Snapshottable<T> extends State {
T get();
T update(ValueUpdater updater);
void set(T o);
}
MemoryMapState 和 MemcachedState 都實現(xiàn)了這些接口.
實現(xiàn)MapState接口
實現(xiàn)MapState
接口非常簡單,Trident幾乎把所有事都做完了阴汇。OpaqueMap
数冬、TransactionalMap
和NonTransactionalMap
都分別實現(xiàn)了各自的容錯語義节槐。只需要為這些類提供一個用于對不同key/value進(jìn)行批量獲取搀庶、批量修改的IBackingMap
實現(xiàn)就行拐纱。IBackingMap
接口如下:
public interface IBackingMap<T> {
List<T> multiGet(List<List<Object>> keys);
void multiPut(List<List<Object>> keys, List<T> vals);
}
OpaqueMap
會使用OpaqueValue作為vals參數(shù)調(diào)用multiPut
方法;TransactionalMap
會使用TransactionalValue作為參數(shù)哥倔;NonTransactionalMaps
會直接把拓?fù)鋵ο髠魅搿?/p>
Trident還提供了CachedMap類來實現(xiàn)key/value的自動LRU緩存操作秸架。
最后,Trident還提供了SnapshottableMap類咆蒿,該類通過將全局聚合的結(jié)果存入一個固定key中的方法將MapState
對象轉(zhuǎn)化為Snapshottable
對象东抹。
可以參考MemcachedState的實現(xiàn)來了解如何將這些工具結(jié)合在一起來提供一個高性能的MapState
實現(xiàn)。MemcachedState
支持不透明事務(wù)沃测、事務(wù)和非事務(wù)語義缭黔。
個人主頁: http://www.howardliu.cn
個人博文: storm筆記:Trident狀態(tài)
CSDN主頁: http://blog.csdn.net/liuxinghao
CSDN博文: storm筆記:Trident狀態(tài)