本人原創(chuàng)翻譯,轉載請注明出處
Storm提供了幾種不同級別的保證消息處理機制等缀,包括best effort, at least once, 通過Trident實現(xiàn)的exactly once佛寿。這篇文章描述了Storm如何保證at least once處理。
一個消息被完全處理(fully processed)究竟是什么意思?
一個tuple從spout中發(fā)出可能觸發(fā)成千上萬個tuples的創(chuàng)建讥脐。例如肠虽,單詞計數(shù)topology:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("sentences", new KestrelSpout("kestrel.backtype.com",
22133,
"sentence_queue",
new StringScheme()));
builder.setBolt("split", new SplitSentence(), 10)
.shuffleGrouping("sentences");
builder.setBolt("count", new WordCount(), 20)
.fieldsGrouping("split", new Fields("word"));
這個topology 從Kestrel隊列中讀取句子幔戏,把句子拆分成單詞組,然后每次emit一個單詞(如果單詞重復出現(xiàn)税课,那么出現(xiàn)多少次emit多少次)闲延。這解釋了一個tuple如何導致n個tuples被創(chuàng)建:句子中的每個單詞,都會成為一個單詞tuple和一個更新單詞計數(shù)的tuple韩玩。消息樹大概像這樣:
Storm定義一個從spout發(fā)出的tuple被完全處理垒玲,當且僅當tuple樹已經(jīng)為空并且樹中的每個消息都已被處理。如果tuple沒有在給定的超時時間(timeout)內(nèi)被完全處理找颓,就定義為處理失敗侍匙。timeout可以使用Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS來配置,默認是30秒叮雳。
當一個消息被完全處理或沒有被完全處理時發(fā)生了什么想暗?
為了理解這個問題,讓我們看看tuple從spout開始的生命周期帘不。作為參考说莫,這里是spouts實現(xiàn)的接口:
public interface ISpout extends Serializable {
void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
void close();
void nextTuple();
void ack(Object msgId);
void fail(Object msgId);
}
首先,Storm通過調(diào)用Spout的nextTuple方法來請求一個tuple寞焙。Spout使用SpoutOutputCollector(在open函數(shù)中提供)來emit一個tuple到某個輸出流储狭。當emitting tuple的時候,Spout設置了一個"message id"捣郊,后續(xù)會用來識別tuple辽狈。舉個例子,KestrelSpout從kestrel隊列中讀取消息呛牲,由Kestrel給出id并設置為"message id"刮萌,然后emit。像這樣發(fā)出消息:
_collector.emit(new Values("field1", "field2", 3) , msgId);
接下來娘扩,tuple被發(fā)送給消費bolts着茸,Storm負責維護消息樹。如果Storm檢測到一個tuple被完全處理了琐旁,Storm會調(diào)用Spout的ack方法(攜帶參數(shù)message id)涮阔。同樣的,如果tuple處理超時灰殴,Storm將調(diào)用Spout的fail方法敬特。注意,一個tuple只會被創(chuàng)建它的那個Spout任務acked或failed,如果Spout被集群中的多個任務執(zhí)行伟阔,tuple不會被非創(chuàng)建它的任務acked或failed尸变。
再次以KestrelSpout為例來說明Spout如何保證消息處理。當KestrelSpout從Kestrel隊列中取出一個消息减俏,它"opens"這個消息召烂,消息并沒有真的從隊列中取下來,只是設置了一個掛起("pending")狀態(tài)娃承,等待消息處理完成的確認奏夫。處于掛起狀態(tài)的消息不會被發(fā)送給其他隊列消費者。此外历筝,如果一個客戶端失去連接酗昼,它的所有掛起狀態(tài)的消息會被放回隊列。KestrelSpout會給SpoutOutputCollector傳遞一個"message id"參數(shù)梳猪,稍后麻削,KestrelSpout的ack和fail函數(shù)被調(diào)用,KestrelSpout會給Kestrel發(fā)一個帶"message id"的ack或fail消息春弥,進而將消息移除或放回隊列呛哟。
什么是Storm的可靠性API?
要想利用Storm的可靠性能力要做兩件事匿沛。首先扫责,任何時候你在tuple樹中創(chuàng)建新的link都要通知Storm。其次逃呼,當你完成一個獨立tuple的處理時也要通知Storm鳖孤。通過做這兩件事,Storm可以檢測tuple樹是否處理完畢并恰當?shù)奶幚韘pout tuple的ack或fail抡笼。Storm的API提供了一種簡潔的方式來完成這些任務苏揣。
在tuple樹中指定一個link被稱作錨定(anchoring)。在你emit一個新的tuple時就同步完成了錨定推姻。舉個例子平匈,下面這個bolt把一個包含句子的tuple拆分成每個單詞的tuple:
public class SplitSentence extends BaseRichBolt {
OutputCollector _collector;
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
public void execute(Tuple tuple) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
_collector.emit(tuple, new Values(word));
}
_collector.ack(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
每個單詞tuple通過指定輸入tuple為emit的第一個參數(shù)而錨定。由于單詞tuple已被錨定拾碌,在單詞tuple處理失敗的時候吐葱,tuple樹的根spout tuple將被重新傳輸街望。相反的校翔,讓我們看看如果像這樣emit tuple會發(fā)生什么:
_collector.emit(new Values(word));
這樣emit的單詞tuple沒有被錨定,如果tuple處理失敗灾前,根tuple不會被重傳防症。取決于你的容錯需求,有時候以非錨定的方式emit tuple也是恰當?shù)摹?/p>
一個輸出tuple可以被錨定到多個輸入tuple,這對流連接或流聚合(streaming joins or aggregations)很有用蔫敲。被多個輸入錨定的tuple處理失敗饲嗽,會導致多個根tuple重傳。例子:
List<Tuple> anchors = new ArrayList<Tuple>();
anchors.add(tuple1);
anchors.add(tuple2);
_collector.emit(anchors, new Values(1, 2, 3));
多錨定(Multi-anchoring)將把輸出tuple加入到多個tuple樹奈嘿。注意貌虾,這可能會破壞樹的結構并且創(chuàng)建tuple 有向無環(huán)圖(DAGs)。例如:
Storm的實現(xiàn)支持有向無環(huán)圖和樹裙犹。
錨定就是你如何說明tuple樹——下一個也是最后一個關于Storm可靠性API的點是當你處理完一個獨立的tuple時尽狠,如何說明tuple樹。通過調(diào)用OutputCollector的ack和fail來實現(xiàn)這個操作叶圃。如果你往回看例子SplitSentence袄膏,你會看到在所有單詞tuple被emit之后輸入tuple被確認了(acked)。
你可以使用OutputCollector 的fail方法來立即使根tuple(spout tuple)失敗掺冠。例如沉馆,你的應用也許會選擇捕獲數(shù)據(jù)庫客戶端的異常,顯式的使輸入tuple失敗德崭。通過顯式的使tuple失敗斥黑,根tuple可以比等待超時更快的被重傳。
每個tuple都應該被ack或fail眉厨。Storm占用了內(nèi)存來跟蹤每一個tuple心赶,如果不ack/fail每個tuple,任務可能最終會耗盡內(nèi)存缺猛。
許多bolts使用了一種通用模式來讀取和發(fā)出輸入tuple缨叫,在execute方法的最后ack tuple。這些bolts歸類為過濾器和簡單函數(shù)(filters and simple functions)荔燎。Storm提供了一個BasicBolt接口封裝了這種模式耻姥,SplitSentence例子可以用BasicBolt實現(xiàn):
public class SplitSentence extends BaseBasicBolt {
public void execute(Tuple tuple, BasicOutputCollector collector) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
collector.emit(new Values(word));
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
這種實現(xiàn)比之前的實現(xiàn)簡單,語義上一致有咨。Tuples自動錨定到輸入tuple琐簇,execute方法完成時自動ack。
相反座享,實現(xiàn)聚合和連接的bolts可能會延遲ack婉商,直到一組tuples處理完畢。聚合和連接一般也會多錨定(multi-anchor)渣叛,IBasicBolt不能自動做這些丈秩。
如果tuples可以重傳,程序該如何正確工作淳衙?
軟件設計的一貫答案是“取決于”蘑秽。如果你一定要一個答案饺著,考慮使用Trident API。某些情況下肠牲,如要做很多分析并且可以容忍丟失數(shù)據(jù)幼衰,那么可以通過設置acker bolts為0(Config.TOPOLOGY_ACKERS)來禁用容錯。但在有些情況下缀雳,你想要確保每個數(shù)據(jù)都被至少處理了一次并且沒有丟失渡嚣。
Storm如何有效的實現(xiàn)可靠性?
Storm的topology有一些特殊的“acker”任務肥印,負責追蹤每個spout tuple的tuples DAG严拒,一旦acker發(fā)現(xiàn)DAG完成了,它就會發(fā)一個確認消息給spout竖独。你可以通過Config.TOPOLOGY_ACKERS設置acker任務的數(shù)量裤唠。Storm默認是每個worker有一個acker。
不管是spout還是bolt發(fā)出的tuple都有一個64位的id莹痢。每個tuple都知道tuple樹中的所有spout tuples的ids种蘸。當你發(fā)出一個新tuple時,老的tuple錨定的spout tuples ids被復制到新的tuple竞膳。當一個tuple被確認了航瞭,它會發(fā)一個tuple樹如何變更的消息給acker任務,特別地坦辟,消息可能像這樣:“我已經(jīng)完成了tuple樹中這個spout tuple的處理刊侯,樹中有一些新的tuples以我為錨”。
例如锉走,如果tuples “D”和“E”是基于tuple “C”而創(chuàng)建滨彻,當“C”確認時,tuple樹的變化如下:
由于在“D”和“E”創(chuàng)建的同時挪蹭,“C”被從樹中移除了亭饵,樹永遠不會過早的(prematurely)完成。注:這句不是很理解
還有一些細節(jié)要提一下梁厉。之前提到可以有多個acker任務辜羊,那么當一個tuple被確認時,如何知道由哪一個任務發(fā)送確認信息词顾?
Storm使用mod hashing來映射spout tuple id到acker任務八秃。由于每個tuple都攜帶了它所在所有樹中的spout tuple ids,因此知道該與哪個acker任務通信肉盹。
另一個細節(jié)是acker任務如何跟蹤spout任務昔驱。當spout task發(fā)出一個新tuple,它只是簡單的發(fā)送消息到恰當?shù)腶cker垮媒,告訴它為這個spout tuple負責舍悯。之后當一個acker發(fā)現(xiàn)樹已經(jīng)完成航棱,它就知道該給哪個任務id發(fā)完成信息睡雇。
acker任務不會顯式追蹤tuples樹萌衬。對于有好幾萬節(jié)點(甚至更多)的大tuple樹,跟蹤所有的tuple樹可能會造成內(nèi)存不夠用它抱。ackers采用一種策略秕豫,對每個spout tuple只要求固定數(shù)量的內(nèi)存(大約20字節(jié))。這個追蹤算法是理解Storm工作的關鍵观蓄,也是Storm主要的突破之一混移。
acker任務存儲了一個spout tuple到一組值的map。第一個值是任務id侮穿,用來發(fā)送完成信息歌径。第二個值是64位數(shù)字,名為“ack val”亲茅,這個值代表了整個tuple樹的狀態(tài)回铛,無論樹多大多小。它只是簡單的把樹中所有已創(chuàng)建或確認的tuple ids做xor運算克锣。
當一個acker任務發(fā)現(xiàn)“ack val”變成了0茵肃,它就知道tuple樹完成了。由于tuple ids是64位隨機數(shù)袭祟,“ack val”意外變成0的概率極小验残。用數(shù)學知識算一下,每秒10K個acks巾乳,大概要花50,000,000年才會發(fā)生一個錯誤您没。即使發(fā)生錯誤,也只是丟失數(shù)據(jù)胆绊。
現(xiàn)在你理解了可靠性算法紊婉,讓我們過一遍失敗的情形,看看每種情形下Storm如何避免數(shù)據(jù)丟失:
- 由于任務異常終止辑舷,tuple未被確認:這種情況下失敗tuple的樹根處的spout tuple將超時并重發(fā)喻犁。
- acker任務異常終止:這種情況下,所有這個akcer跟蹤的spout tuples都會超時并重發(fā)何缓。
- spout任務異常終止:這種情況下肢础,spout的源負責重發(fā)消息。例如碌廓,客戶端失去連接時传轰,像Kestrel和RabbitMQ這樣的隊列將把掛起的消息重新放回隊列。
如你所見谷婆,Storm的可靠性機制是完全分布式慨蛙、大規(guī)模和容錯的辽聊。
調(diào)教reliability
acker任務是輕量級的,所以在一個topology里不需要很多個acker期贫。你可以通過Storm UI(組件id“__acker”)跟蹤acker的性能跟匆,如果吞吐量不行,可以增加acker的數(shù)量通砍。
如果可靠性對你不重要——你不關心丟失tuples玛臂,那么你可以通過不追蹤tuple樹來增加性能。不追蹤tuple樹可以減半消息傳輸?shù)臄?shù)量封孙,另外迹冤,下游的tuple可以保存更少的ids,節(jié)省了網(wǎng)絡帶寬虎忌。
有三種方式可以移除可靠性泡徙。第一種是設置Config.TOPOLOGY_ACKERS為0。這種情況下膜蠢,Storm會在spout發(fā)出tuple時立即調(diào)用ack方法堪藐。
第二種是移除消息上的可靠性。你可以在調(diào)用SpoutOutputCollector.emit方法的時候不傳消息id狡蝶,這樣就關閉了對某個spout tuple的追蹤庶橱。
最后,如果你不關心下游tuples是否處理失敗贪惹,你可以在emit它們的時候不錨定它們苏章。由于它們沒有錨定到任何spout tuples上,它們沒被確認不會導致任何spout tuples失敗奏瞬。