本文列出 Storm 的幾個(gè)主要概念,并會(huì)給出相關(guān)資源的鏈接以便你獲取更多信息擒抛,概念主要如下:
- Topologies
- Streams
- Spouts
- Bolts
- Stream groupings
- Reliability
- Tasks
- Workers
拓?fù)洌═opologies)
實(shí)時(shí)應(yīng)用程序的邏輯被打包到 Storm 拓?fù)渲小R粋€(gè) Storm 拓?fù)漕愃朴谝粋€(gè) MapReduce 任務(wù)。關(guān)鍵的區(qū)別在于 MapReduce 任務(wù)最終會(huì)結(jié)束灭袁,而拓?fù)鋾?huì)一直運(yùn)行(當(dāng)然,除非你強(qiáng)制 kill 掉拓?fù)湎嚓P(guān)的進(jìn)程)窗看。拓?fù)淇梢岳斫鉃橥ㄟ^數(shù)據(jù)流(Stream Grouping)將 Spout 和 Bolt 相互連接而組成的圖狀結(jié)構(gòu)的程序茸歧。spouts 和 bolts 的概念會(huì)在下文介紹。
相關(guān)資源:
- TopologyBuilder: 構(gòu)造 topologies 的 Java 類
- 在生成環(huán)境的集群上運(yùn)行 topologies
- 本地模式: 學(xué)習(xí)如何在本地模式開發(fā)以及測(cè)試 topologies
流(Streams)
流是 Storm 的核心抽象显沈。Storm中软瞎,一個(gè)流指的是在分布式環(huán)境中被并行創(chuàng)建以及處理的元組(tuple)序列集逢唤。流是無限的元組(tuple)序列,以分布式方式并行創(chuàng)建和處理涤浇。流往往有固定的模式(我們稱之為“fields”)鳖藕,不同模式由不同的元組(tuple)類型以一定的方式組成。通常只锭,元組(tuple)可以包含 integers, longs, shorts, bytes, strings, doubles, floats, booleans, 以及 byte arrays著恩。當(dāng)然,你也可以通過定義可序列化的對(duì)象來實(shí)現(xiàn)自定義的元組類型蜻展。
相關(guān)資源:
- Tuple: 流由元組組成
- OutputFieldsDeclarer: 用來指定流以及它的模式
- Serialization: 元組的動(dòng)態(tài)類型以及自定義序列化
數(shù)據(jù)源(Spouts)
在拓?fù)渲校?spout 是流的來源喉誊。通常情況,Spouts 會(huì)從外部源(例如消息隊(duì)列或者 Twitter API)讀取數(shù)據(jù)并將數(shù)據(jù)發(fā)送到拓?fù)渲凶莨恕pouts 既可以是可靠的裹驰,也可以是不可靠的∑遥可靠的情況是如果數(shù)據(jù)流沒有被 Storm 處理幻林,Spouts 將重新發(fā)送數(shù)據(jù)。不可靠的情況則是對(duì)發(fā)送過的數(shù)據(jù)不予確認(rèn)音念。
Spouts 一次可以發(fā)送多個(gè)流沪饺。為了實(shí)現(xiàn)多流發(fā)送,我們可以使用(實(shí)現(xiàn)) OutputFieldsDeclarer 接口中的 declareStream 方法來指定多個(gè)流闷愤,并使用(實(shí)現(xiàn)) SpoutOutputCollector 接口中的 emit 方法進(jìn)行發(fā)送整葡。
nextTuple 是 Spouts 中的主要方法。nextTuple 方法要么發(fā)送一個(gè)新的元組到 topology 中讥脐,要么直接返回(如果沒有新的元組需要發(fā)送)遭居。需要注意的是,nextTuple 不應(yīng)該被 Spout 的任何其他方法所阻塞旬渠,否則會(huì)導(dǎo)致數(shù)據(jù)流的停止接入俱萍,這是因?yàn)?Spout 的所有方法是在一個(gè)線程中執(zhí)行。
ack 和 fail 是 Spouts 中另外兩個(gè)重要的方法告丢。Spouts 為可靠模式時(shí)枪蘑,Storm 會(huì)檢測(cè)每一個(gè)從 Spouts 發(fā)送出去的元組是否成功,成功調(diào)用 ack岖免,失敗調(diào)用 fail岳颇。當(dāng)然,在不可靠模式下颅湘,是不會(huì)調(diào)用這兩個(gè)方法的话侧。
相關(guān)資源:
- IRichSpout: 自定義的 spouts 必須實(shí)現(xiàn)這個(gè)接口
- 消息的可靠性處理
處理組件(Bolts)
topologies 所有的處理都是在 bolts 中進(jìn)行。bolts 可以做很多事情闯参,例如:過濾流瞻鹏、邏輯處理术羔、聚合、連接乙漓、數(shù)據(jù)庫交互等等级历。
bolts 可以從事簡(jiǎn)單的數(shù)據(jù)流轉(zhuǎn)換。處理復(fù)雜的數(shù)據(jù)流轉(zhuǎn)換通常需要將流程分成多步叭披,這也就意味著我們可以使用多類(個(gè)) bolt寥殖。例如,從微博數(shù)據(jù)流中得出一個(gè)趨勢(shì)圖涩蜘,實(shí)現(xiàn)這個(gè)需求我們至少需要兩步:第一個(gè) bolt 計(jì)算每個(gè)圖片的點(diǎn)擊數(shù)嚼贡,第二個(gè) bolt 在第一個(gè)基礎(chǔ)上得出 TOP X 的圖片(當(dāng)然為了流程可擴(kuò)展,我們可以使用更多的 bolt,不僅限于兩個(gè))同诫。
bolts 一次可以發(fā)送多個(gè)流粤策。為了實(shí)現(xiàn)多流發(fā)送,我們可以使用(實(shí)現(xiàn)) OutputFieldsDeclarer 接口中的 declareStream 方法來指定多個(gè)流误窖,并使用(實(shí)現(xiàn)) OutputCollector 接口中的 emit 方法進(jìn)行發(fā)送(跟 spout 類似)叮盘。
在定義 Bolt 的輸入數(shù)據(jù)流時(shí),你需要從其他的 Storm 組件中訂閱指定的數(shù)據(jù)流霹俺。如果你需要從其他所有的組件中訂閱數(shù)據(jù)流柔吼,你就必須要在定義 Bolt 時(shí)分別注冊(cè)每一個(gè)組件。對(duì)于聲明為默認(rèn) id 的數(shù)據(jù)流丙唧,InputDeclarer 接口有訂閱此類數(shù)據(jù)流的語法糖愈魏。調(diào)用 <font color=orange size=2 >declarer.shuffleGrouping("1") </font> 將訂閱來自 id 為“1” 的組件(spout/bolt)產(chǎn)生的數(shù)據(jù)流,其等價(jià)于調(diào)用 <font color=orange size=2 >declarer.shuffleGrouping("1", DEFAULT_STREAM_ID)</font>
execute 是 bolt 的主要方法想际,它接收新的元組作為輸入培漏。bolt 使用 OutputCollector 對(duì)象來發(fā)送新的元組。bolt 必須為每個(gè)經(jīng)由它處理的元組調(diào)用 OutputCollector 中的 ack 方法胡本,這樣以便 Storm 知道這些元組什么時(shí)候被處理完成(最終判斷對(duì)原始 spout 元組的響應(yīng)是否合適)牌柄。處理元組的一般情況是,我們可以發(fā)送多個(gè)元組或者直接不發(fā)送打瘪,然后響應(yīng)下一個(gè)輸入元組友鼻,我們可以實(shí)現(xiàn) IBasicBolt 接口來完成 bolt 操作傻昙。
我們可以在 bolt 任務(wù)中開啟一個(gè)新的線程來完成異步操作闺骚。OutputCollector 線程安全并且可以隨時(shí)被調(diào)用。
相關(guān)資源:
- IRichBolt: bolt 的主要接口.
- IBasicBolt: 另外一個(gè)簡(jiǎn)單接口妆档,幫助用實(shí)現(xiàn)過濾以及其他功能
- OutputCollector: 使用這個(gè)類來發(fā)送元組到輸出流中
- 消息的可靠性處理
流分組(Stream groupings)
定義一個(gè) topology 的重要一部分是指定每個(gè) bolt 應(yīng)該接收哪些流作為輸入僻爽。流分組(stream grouping)定義了流如何分發(fā)到各個(gè) bolt 中。
Storm 提供了 8 種流分組策略贾惦。當(dāng)然胸梆,你也可以通過實(shí)現(xiàn) CustomStreamGrouping 接口來實(shí)現(xiàn)一個(gè)用戶自定義的流分組:
- Shuffle grouping : 元組被隨機(jī)分發(fā)到各個(gè) bolt 任務(wù)中敦捧,也就是說每個(gè) bolt 接收到大致相同數(shù)目的元組。
- Fields grouping : 根據(jù)指定的 field 進(jìn)行分組 碰镜,同一個(gè) field 的值一定會(huì)被發(fā)送到同一個(gè) task 上兢卵。例如,如果流按照 "user-id" 這個(gè) field 進(jìn)行分組绪颖,那么相同的 "user-id" 值會(huì)進(jìn)入相同的任務(wù)(task)秽荤,如果不同,則進(jìn)入不同的任務(wù)柠横。
- Partial Key grouping : 與 Fields grouping 類似窃款,根據(jù)指定的 field 的一部分進(jìn)行分組分發(fā),能夠很好地實(shí)現(xiàn) load balance牍氛,將元組發(fā)送給下游的 bolt 對(duì)應(yīng)的 task晨继,特別是在存在數(shù)據(jù)傾斜的場(chǎng)景,使用 Partial Key grouping 能夠更好地提高資源利用率
- All grouping : 流復(fù)制到所有 bolt task 上搬俊。
- Global grouping: 所有的流都指向一個(gè) bolt 的同一個(gè) task紊扬,也就是Task ID最小的。
- None grouping : 使用這個(gè)分組唉擂,用戶不用關(guān)心流是如何進(jìn)行分組的珠月。目前,這個(gè)分組類似于 Shuffle grouping楔敌。不過未來 Storm 可能會(huì)考慮通過這種分組來讓 Bolt 和它所訂閱的 Spout 或 Bolt 在同一個(gè)線程中執(zhí)行啤挎。
- Direct grouping : 由 tupe 的生產(chǎn)者來決定發(fā)送給下游的哪一個(gè) bolt 的 task ,這個(gè)要在實(shí)際開發(fā)編寫 bolt 代碼的邏輯中進(jìn)行精確控制卵凑。
- Local or shuffle grouping : 如果目標(biāo) bolt 有1個(gè)或多個(gè) task 都在同一個(gè) worker 進(jìn)程對(duì)應(yīng)的 JVM 實(shí)例中庆聘,則 tuple 只發(fā)送給這些 task。
可靠性(Reliability)
Storm 保證每個(gè) spout 元組都能在拓?fù)渲斜惶幚砩茁Mㄟ^跟蹤由 Spout 發(fā)出的每個(gè)元組構(gòu)成的元組樹可以確定元組是否已經(jīng)完成處理伙判。每個(gè)拓?fù)涠加信c之相關(guān)的消息超時(shí)。如果在超時(shí)時(shí)間內(nèi)沒有檢測(cè)到元組是否被完整處理黑忱,該原則將會(huì)被標(biāo)記并重新發(fā)送宴抚。
想要使用 Storm 這個(gè)可靠性功能,你必須在元組創(chuàng)建以及處理完成時(shí)告訴 Storm甫煞。你可以使用用于發(fā)送數(shù)據(jù)流的 OutputCollector 對(duì)象菇曲,并使用 ack 方法表明你已經(jīng)完成了元組的處理。
任務(wù)(Tasks)
集群中抚吠,每一個(gè) spout 和 bolt 運(yùn)行了多個(gè)任務(wù)常潮。每個(gè)任務(wù)對(duì)應(yīng)一個(gè)執(zhí)行線程,流分組定義如何將元組從一組任務(wù)發(fā)送到另一組任務(wù)楷力。你可以使用 TopologyBuilder 中的 setSpout 和 setBolt 方法來設(shè)置任務(wù)并行度喊式。
Workers
一個(gè)拓?fù)渲羞\(yùn)行了一個(gè)或多個(gè) worker 進(jìn)程孵户。每個(gè)進(jìn)程都是一個(gè)物理 JVM,并且拓?fù)渲械乃?task 都在這些進(jìn)程中執(zhí)行岔留。例如夏哭,如果并行度為 300,我們有 50 個(gè)worker 進(jìn)程献联,那么每個(gè)進(jìn)程將處理 6 個(gè) task方庭。Storm 有其機(jī)制致力于將所有任務(wù)盡量平均地分配到每個(gè)進(jìn)程中。
相關(guān)資源:
Config.TOPOLOGY_WORKERS: 設(shè)置 worker 數(shù)量的配置