Storm基礎(四)保證消息處理

原文鏈接:Guaranteeing Message Processing

本人原創(chuàng)翻譯,轉載請注明出處

Storm提供了幾種不同級別的保證消息處理機制等缀,包括best effort, at least once, 通過Trident實現(xiàn)的exactly once佛寿。這篇文章描述了Storm如何保證at least once處理。

一個消息被完全處理(fully processed)究竟是什么意思?

一個tuple從spout中發(fā)出可能觸發(fā)成千上萬個tuples的創(chuàng)建讥脐。例如肠虽,單詞計數(shù)topology:

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("sentences", new KestrelSpout("kestrel.backtype.com",
                                               22133,
                                               "sentence_queue",
                                               new StringScheme()));
builder.setBolt("split", new SplitSentence(), 10)
        .shuffleGrouping("sentences");
builder.setBolt("count", new WordCount(), 20)
        .fieldsGrouping("split", new Fields("word"));

這個topology 從Kestrel隊列中讀取句子幔戏,把句子拆分成單詞組,然后每次emit一個單詞(如果單詞重復出現(xiàn)税课,那么出現(xiàn)多少次emit多少次)闲延。這解釋了一個tuple如何導致n個tuples被創(chuàng)建:句子中的每個單詞,都會成為一個單詞tuple和一個更新單詞計數(shù)的tuple韩玩。消息樹大概像這樣:


Storm定義一個從spout發(fā)出的tuple被完全處理垒玲,當且僅當tuple樹已經(jīng)為空并且樹中的每個消息都已被處理。如果tuple沒有在給定的超時時間(timeout)內(nèi)被完全處理找颓,就定義為處理失敗侍匙。timeout可以使用Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS來配置,默認是30秒叮雳。

當一個消息被完全處理或沒有被完全處理時發(fā)生了什么想暗?

為了理解這個問題,讓我們看看tuple從spout開始的生命周期帘不。作為參考说莫,這里是spouts實現(xiàn)的接口:

public interface ISpout extends Serializable {
    void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
    void close();
    void nextTuple();
    void ack(Object msgId);
    void fail(Object msgId);
}

首先,Storm通過調(diào)用Spout的nextTuple方法來請求一個tuple寞焙。Spout使用SpoutOutputCollector(在open函數(shù)中提供)來emit一個tuple到某個輸出流储狭。當emitting tuple的時候,Spout設置了一個"message id"捣郊,后續(xù)會用來識別tuple辽狈。舉個例子,KestrelSpout從kestrel隊列中讀取消息呛牲,由Kestrel給出id并設置為"message id"刮萌,然后emit。像這樣發(fā)出消息:

_collector.emit(new Values("field1", "field2", 3) , msgId);

接下來娘扩,tuple被發(fā)送給消費bolts着茸,Storm負責維護消息樹。如果Storm檢測到一個tuple被完全處理了琐旁,Storm會調(diào)用Spout的ack方法(攜帶參數(shù)message id)涮阔。同樣的,如果tuple處理超時灰殴,Storm將調(diào)用Spout的fail方法敬特。注意,一個tuple只會被創(chuàng)建它的那個Spout任務acked或failed,如果Spout被集群中的多個任務執(zhí)行伟阔,tuple不會被非創(chuàng)建它的任務acked或failed尸变。

再次以KestrelSpout為例來說明Spout如何保證消息處理。當KestrelSpout從Kestrel隊列中取出一個消息减俏,它"opens"這個消息召烂,消息并沒有真的從隊列中取下來,只是設置了一個掛起("pending")狀態(tài)娃承,等待消息處理完成的確認奏夫。處于掛起狀態(tài)的消息不會被發(fā)送給其他隊列消費者。此外历筝,如果一個客戶端失去連接酗昼,它的所有掛起狀態(tài)的消息會被放回隊列。KestrelSpout會給SpoutOutputCollector傳遞一個"message id"參數(shù)梳猪,稍后麻削,KestrelSpout的ack和fail函數(shù)被調(diào)用,KestrelSpout會給Kestrel發(fā)一個帶"message id"的ack或fail消息春弥,進而將消息移除或放回隊列呛哟。

什么是Storm的可靠性API?

要想利用Storm的可靠性能力要做兩件事匿沛。首先扫责,任何時候你在tuple樹中創(chuàng)建新的link都要通知Storm。其次逃呼,當你完成一個獨立tuple的處理時也要通知Storm鳖孤。通過做這兩件事,Storm可以檢測tuple樹是否處理完畢并恰當?shù)奶幚韘pout tuple的ack或fail抡笼。Storm的API提供了一種簡潔的方式來完成這些任務苏揣。

在tuple樹中指定一個link被稱作錨定(anchoring)。在你emit一個新的tuple時就同步完成了錨定推姻。舉個例子平匈,下面這個bolt把一個包含句子的tuple拆分成每個單詞的tuple:

public class SplitSentence extends BaseRichBolt {
        OutputCollector _collector;

        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
            _collector = collector;
        }

        public void execute(Tuple tuple) {
            String sentence = tuple.getString(0);
            for(String word: sentence.split(" ")) {
                _collector.emit(tuple, new Values(word));
            }
            _collector.ack(tuple);
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }        
    }

每個單詞tuple通過指定輸入tuple為emit的第一個參數(shù)而錨定。由于單詞tuple已被錨定拾碌,在單詞tuple處理失敗的時候吐葱,tuple樹的根spout tuple將被重新傳輸街望。相反的校翔,讓我們看看如果像這樣emit tuple會發(fā)生什么:

_collector.emit(new Values(word));

這樣emit的單詞tuple沒有被錨定,如果tuple處理失敗灾前,根tuple不會被重傳防症。取決于你的容錯需求,有時候以非錨定的方式emit tuple也是恰當?shù)摹?/p>

一個輸出tuple可以被錨定到多個輸入tuple,這對流連接或流聚合(streaming joins or aggregations)很有用蔫敲。被多個輸入錨定的tuple處理失敗饲嗽,會導致多個根tuple重傳。例子:

List<Tuple> anchors = new ArrayList<Tuple>();
anchors.add(tuple1);
anchors.add(tuple2);
_collector.emit(anchors, new Values(1, 2, 3));

多錨定(Multi-anchoring)將把輸出tuple加入到多個tuple樹奈嘿。注意貌虾,這可能會破壞樹的結構并且創(chuàng)建tuple 有向無環(huán)圖(DAGs)。例如:


Tuple DAG

Storm的實現(xiàn)支持有向無環(huán)圖和樹裙犹。

錨定就是你如何說明tuple樹——下一個也是最后一個關于Storm可靠性API的點是當你處理完一個獨立的tuple時尽狠,如何說明tuple樹。通過調(diào)用OutputCollector的ack和fail來實現(xiàn)這個操作叶圃。如果你往回看例子SplitSentence袄膏,你會看到在所有單詞tuple被emit之后輸入tuple被確認了(acked)。

你可以使用OutputCollector 的fail方法來立即使根tuple(spout tuple)失敗掺冠。例如沉馆,你的應用也許會選擇捕獲數(shù)據(jù)庫客戶端的異常,顯式的使輸入tuple失敗德崭。通過顯式的使tuple失敗斥黑,根tuple可以比等待超時更快的被重傳。

每個tuple都應該被ack或fail眉厨。Storm占用了內(nèi)存來跟蹤每一個tuple心赶,如果不ack/fail每個tuple,任務可能最終會耗盡內(nèi)存缺猛。

許多bolts使用了一種通用模式來讀取和發(fā)出輸入tuple缨叫,在execute方法的最后ack tuple。這些bolts歸類為過濾器和簡單函數(shù)(filters and simple functions)荔燎。Storm提供了一個BasicBolt接口封裝了這種模式耻姥,SplitSentence例子可以用BasicBolt實現(xiàn):

public class SplitSentence extends BaseBasicBolt {
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String sentence = tuple.getString(0);
            for(String word: sentence.split(" ")) {
                collector.emit(new Values(word));
            }
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }        
    }

這種實現(xiàn)比之前的實現(xiàn)簡單,語義上一致有咨。Tuples自動錨定到輸入tuple琐簇,execute方法完成時自動ack。
相反座享,實現(xiàn)聚合和連接的bolts可能會延遲ack婉商,直到一組tuples處理完畢。聚合和連接一般也會多錨定(multi-anchor)渣叛,IBasicBolt不能自動做這些丈秩。

如果tuples可以重傳,程序該如何正確工作淳衙?

軟件設計的一貫答案是“取決于”蘑秽。如果你一定要一個答案饺著,考慮使用Trident API。某些情況下肠牲,如要做很多分析并且可以容忍丟失數(shù)據(jù)幼衰,那么可以通過設置acker bolts為0(Config.TOPOLOGY_ACKERS)來禁用容錯。但在有些情況下缀雳,你想要確保每個數(shù)據(jù)都被至少處理了一次并且沒有丟失渡嚣。

Storm如何有效的實現(xiàn)可靠性?

Storm的topology有一些特殊的“acker”任務肥印,負責追蹤每個spout tuple的tuples DAG严拒,一旦acker發(fā)現(xiàn)DAG完成了,它就會發(fā)一個確認消息給spout竖独。你可以通過Config.TOPOLOGY_ACKERS設置acker任務的數(shù)量裤唠。Storm默認是每個worker有一個acker。

不管是spout還是bolt發(fā)出的tuple都有一個64位的id莹痢。每個tuple都知道tuple樹中的所有spout tuples的ids种蘸。當你發(fā)出一個新tuple時,老的tuple錨定的spout tuples ids被復制到新的tuple竞膳。當一個tuple被確認了航瞭,它會發(fā)一個tuple樹如何變更的消息給acker任務,特別地坦辟,消息可能像這樣:“我已經(jīng)完成了tuple樹中這個spout tuple的處理刊侯,樹中有一些新的tuples以我為錨”。

例如锉走,如果tuples “D”和“E”是基于tuple “C”而創(chuàng)建滨彻,當“C”確認時,tuple樹的變化如下:

由于在“D”和“E”創(chuàng)建的同時挪蹭,“C”被從樹中移除了亭饵,樹永遠不會過早的(prematurely)完成。注:這句不是很理解

還有一些細節(jié)要提一下梁厉。之前提到可以有多個acker任務辜羊,那么當一個tuple被確認時,如何知道由哪一個任務發(fā)送確認信息词顾?

Storm使用mod hashing來映射spout tuple id到acker任務八秃。由于每個tuple都攜帶了它所在所有樹中的spout tuple ids,因此知道該與哪個acker任務通信肉盹。

另一個細節(jié)是acker任務如何跟蹤spout任務昔驱。當spout task發(fā)出一個新tuple,它只是簡單的發(fā)送消息到恰當?shù)腶cker垮媒,告訴它為這個spout tuple負責舍悯。之后當一個acker發(fā)現(xiàn)樹已經(jīng)完成航棱,它就知道該給哪個任務id發(fā)完成信息睡雇。

acker任務不會顯式追蹤tuples樹萌衬。對于有好幾萬節(jié)點(甚至更多)的大tuple樹,跟蹤所有的tuple樹可能會造成內(nèi)存不夠用它抱。ackers采用一種策略秕豫,對每個spout tuple只要求固定數(shù)量的內(nèi)存(大約20字節(jié))。這個追蹤算法是理解Storm工作的關鍵观蓄,也是Storm主要的突破之一混移。

acker任務存儲了一個spout tuple到一組值的map。第一個值是任務id侮穿,用來發(fā)送完成信息歌径。第二個值是64位數(shù)字,名為“ack val”亲茅,這個值代表了整個tuple樹的狀態(tài)回铛,無論樹多大多小。它只是簡單的把樹中所有已創(chuàng)建或確認的tuple ids做xor運算克锣。

當一個acker任務發(fā)現(xiàn)“ack val”變成了0茵肃,它就知道tuple樹完成了。由于tuple ids是64位隨機數(shù)袭祟,“ack val”意外變成0的概率極小验残。用數(shù)學知識算一下,每秒10K個acks巾乳,大概要花50,000,000年才會發(fā)生一個錯誤您没。即使發(fā)生錯誤,也只是丟失數(shù)據(jù)胆绊。

現(xiàn)在你理解了可靠性算法紊婉,讓我們過一遍失敗的情形,看看每種情形下Storm如何避免數(shù)據(jù)丟失:

  • 由于任務異常終止辑舷,tuple未被確認:這種情況下失敗tuple的樹根處的spout tuple將超時并重發(fā)喻犁。
  • acker任務異常終止:這種情況下,所有這個akcer跟蹤的spout tuples都會超時并重發(fā)何缓。
  • spout任務異常終止:這種情況下肢础,spout的源負責重發(fā)消息。例如碌廓,客戶端失去連接時传轰,像Kestrel和RabbitMQ這樣的隊列將把掛起的消息重新放回隊列。

如你所見谷婆,Storm的可靠性機制是完全分布式慨蛙、大規(guī)模和容錯的辽聊。

調(diào)教reliability

acker任務是輕量級的,所以在一個topology里不需要很多個acker期贫。你可以通過Storm UI(組件id“__acker”)跟蹤acker的性能跟匆,如果吞吐量不行,可以增加acker的數(shù)量通砍。

如果可靠性對你不重要——你不關心丟失tuples玛臂,那么你可以通過不追蹤tuple樹來增加性能。不追蹤tuple樹可以減半消息傳輸?shù)臄?shù)量封孙,另外迹冤,下游的tuple可以保存更少的ids,節(jié)省了網(wǎng)絡帶寬虎忌。

有三種方式可以移除可靠性泡徙。第一種是設置Config.TOPOLOGY_ACKERS為0。這種情況下膜蠢,Storm會在spout發(fā)出tuple時立即調(diào)用ack方法堪藐。

第二種是移除消息上的可靠性。你可以在調(diào)用SpoutOutputCollector.emit方法的時候不傳消息id狡蝶,這樣就關閉了對某個spout tuple的追蹤庶橱。

最后,如果你不關心下游tuples是否處理失敗贪惹,你可以在emit它們的時候不錨定它們苏章。由于它們沒有錨定到任何spout tuples上,它們沒被確認不會導致任何spout tuples失敗奏瞬。

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末枫绅,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子硼端,更是在濱河造成了極大的恐慌并淋,老刑警劉巖珍昨,帶你破解...
    沈念sama閱讀 217,907評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件县耽,死亡現(xiàn)場離奇詭異,居然都是意外死亡镣典,警方通過查閱死者的電腦和手機兔毙,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,987評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來兄春,“玉大人澎剥,你說我怎么就攤上這事「嫌撸” “怎么了哑姚?”我有些...
    開封第一講書人閱讀 164,298評論 0 354
  • 文/不壞的土叔 我叫張陵祭饭,是天一觀的道長。 經(jīng)常有香客問我叙量,道長倡蝙,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,586評論 1 293
  • 正文 為了忘掉前任宛乃,我火速辦了婚禮悠咱,結果婚禮上蒸辆,老公的妹妹穿的比我還像新娘征炼。我一直安慰自己,他們只是感情好躬贡,可當我...
    茶點故事閱讀 67,633評論 6 392
  • 文/花漫 我一把揭開白布谆奥。 她就那樣靜靜地躺著,像睡著了一般拂玻。 火紅的嫁衣襯著肌膚如雪酸些。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,488評論 1 302
  • 那天檐蚜,我揣著相機與錄音魄懂,去河邊找鬼。 笑死闯第,一個胖子當著我的面吹牛市栗,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播咳短,決...
    沈念sama閱讀 40,275評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼填帽,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了咙好?” 一聲冷哼從身側響起篡腌,我...
    開封第一講書人閱讀 39,176評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎勾效,沒想到半個月后嘹悼,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,619評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡层宫,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,819評論 3 336
  • 正文 我和宋清朗相戀三年杨伙,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片卒密。...
    茶點故事閱讀 39,932評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡缀台,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出哮奇,到底是詐尸還是另有隱情膛腐,我是刑警寧澤睛约,帶...
    沈念sama閱讀 35,655評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站哲身,受9級特大地震影響辩涝,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜勘天,卻給世界環(huán)境...
    茶點故事閱讀 41,265評論 3 329
  • 文/蒙蒙 一怔揩、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧脯丝,春花似錦商膊、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,871評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至材蹬,卻和暖如春实幕,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背堤器。 一陣腳步聲響...
    開封第一講書人閱讀 32,994評論 1 269
  • 我被黑心中介騙來泰國打工昆庇, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人闸溃。 一個月前我還...
    沈念sama閱讀 48,095評論 3 370
  • 正文 我出身青樓整吆,卻偏偏與公主長得像,于是被迫代替她去往敵國和親圈暗。 傳聞我的和親對象是個殘疾皇子掂为,可洞房花燭夜當晚...
    茶點故事閱讀 44,884評論 2 354

推薦閱讀更多精彩內(nèi)容