一、Storm簡(jiǎn)介
1.1 Storm是什么
Apache Storm(http://storm.apache.org)是由Twitter 開源的分布式實(shí)時(shí)計(jì)算系統(tǒng),Storm 可以非常容易并且可靠的處理無(wú)線的數(shù)據(jù)流,對(duì)比Hadoop的批處理寺董,Storm是一個(gè)實(shí)時(shí)的、分布式的、具備高容錯(cuò)的計(jì)算系統(tǒng)衩婚。
1.2 Storm官網(wǎng)
http://storm.apache.org/
1.3 Storm原理
分布式的實(shí)時(shí)計(jì)算系統(tǒng),能夠可信任的處理大量的流式數(shù)據(jù)效斑,就好比Hadoop對(duì)于批量數(shù)據(jù)進(jìn)行的處理一樣非春;通常來(lái)說(shuō),Hadoop能夠進(jìn)行大批量數(shù)據(jù)的離線處理缓屠,但是在實(shí)時(shí)計(jì)算上的表現(xiàn)實(shí)在是不盡如人意奇昙;而Storm就可以擔(dān)當(dāng)這部分的作用。
1.4 Storm核心概念
Storm計(jì)算結(jié)構(gòu)中的幾個(gè)核心概念為 topology敌完,stream储耐,spout,bolt滨溉,下面我們將依次介紹什湘。
(1)Topology
Topology是 storm 中最核心的概念,其是運(yùn)行在 storm 集群上的一個(gè)實(shí)時(shí)計(jì)算應(yīng)用业踏,相當(dāng)于 hadoop 中的一個(gè) job禽炬,區(qū)別于 job 的時(shí),job 會(huì)有明確的開始和結(jié)束勤家,而 topology 由于實(shí)時(shí)的流式計(jì)算的特殊性腹尖,從啟動(dòng)的那一刻起會(huì)永遠(yuǎn)的運(yùn)行下去,直到手動(dòng)停止伐脖。
Topology由 stream热幔,spouts,bolts 組成讼庇,可以描述為一個(gè)有向無(wú)環(huán)圖绎巨,如下:圖一 topology 示例
(2)Stream
Stream是 storm 中對(duì)數(shù)據(jù)流的抽象,是由無(wú)限制的 tuple 組成的序列蠕啄。Tuple 可以理解為包含一個(gè)或多個(gè)鍵值對(duì)的 hash场勤。Tuples 在 stream 中流經(jīng) bolts戈锻,被逐步處理,最終得到預(yù)設(shè)的結(jié)果和媳。
Stream可比作一條源源不絕的河流格遭,tuple 就是組成這條河流的無(wú)數(shù)水滴。每一個(gè) stream 在 storm 中都有一個(gè)唯一標(biāo)示的 id留瞳。
(3)Spout
從圖一可以看出拒迅,spout是一個(gè) topology 的數(shù)據(jù)源,負(fù)責(zé)連接數(shù)據(jù)源她倘,并將數(shù)據(jù)轉(zhuǎn)化為 tuple emit 到 topology中璧微,經(jīng)由 bolts 處理。
Spout提供了一對(duì)核心方法<ack, fail>來(lái)保障 storm 在數(shù)據(jù)沒有被正確處理的情況下硬梁,不會(huì)被丟棄前硫,仍能被重新處理,當(dāng)然這是可選的靶溜,我們也可以不關(guān)心 tuple 是否被正確的處理开瞭,只負(fù)責(zé)向topology 中 emit 數(shù)據(jù)(在某些場(chǎng)景下可能不需要)。具體實(shí)現(xiàn)原理在后文會(huì)詳細(xì)介紹罩息。
Storm + Kakfa是很常見的組合嗤详,storm提供了storm-kafka擴(kuò)展,封裝了多個(gè)可用的 kafka spouts 供直接使用瓷炮,相關(guān)文檔可以參考這里葱色。
(4)Bolt
Bolt是 topology 中的數(shù)據(jù)處理單元,每個(gè) bolt 都會(huì)對(duì) stream 中的 tuple 進(jìn)行數(shù)據(jù)處理娘香。復(fù)雜的數(shù)據(jù)處理邏輯一般拆分成多個(gè)簡(jiǎn)單的處理邏輯交由每個(gè) Bolt 負(fù)責(zé)苍狰。
Bolt可以執(zhí)行豐富的數(shù)據(jù)處理邏輯,如過(guò)濾烘绽,聚合淋昭,鏈接,數(shù)據(jù)庫(kù)操作等等安接。
Bolt可以接受任意個(gè)數(shù)據(jù)流中的 tuples翔忽,并在對(duì)數(shù)據(jù)進(jìn)行處理后選擇性的輸出到多個(gè)流中。也就是說(shuō)盏檐,bolt 可以訂閱任意數(shù)量的spouts 或其他 bolts emit 的數(shù)據(jù)流歇式,這樣最終形成了復(fù)雜的數(shù)據(jù)流處理網(wǎng)絡(luò),如圖一胡野。
理解了storm的核心概念后材失,下文將介紹storm的并發(fā)機(jī)制
1.5 Storm的基本架構(gòu)
(1)Nimbus:如上圖,就好比Hadoop中的JobTracker硫豆,是集群中的主節(jié)點(diǎn)龙巨,負(fù)責(zé)分發(fā)用戶代碼笼呆,把需要處理的任務(wù)指派給具體的Supervisor,再由其上的Worker進(jìn)行實(shí)際的處理旨别。
(2)Supervisor:集群中的從節(jié)點(diǎn)抄邀,負(fù)責(zé)管理機(jī)器上運(yùn)行的Worker進(jìn)程,這里昼榛,需要注意,worker是一個(gè)進(jìn)程剔难,其內(nèi)部還可以啟動(dòng)多個(gè)線程來(lái)進(jìn)行任務(wù)的處理胆屿;通常,我們?cè)僦付ǖ臅r(shí)候偶宫,會(huì)在此處通過(guò)指定端口號(hào)非迹,來(lái)指定機(jī)器上到底啟動(dòng)多少個(gè)worker。
(3)Zookeeper:基本只要牽涉到集群纯趋,都需要用到zookeeper憎兽,這也符合其作為動(dòng)物園管理員的職責(zé),通過(guò)zookeeper吵冒,nimbus會(huì)感知到Supervisor的下線和上線纯命,會(huì)合理分配資源,完成Topology的處理
(4)Topology:這就好比我們平時(shí)提交的一個(gè)Application痹栖,只是換了一個(gè)名稱而已亿汞。
1.7 Storm與Spark的區(qū)別
其實(shí),這里更應(yīng)該說(shuō)是Spark-Streaming與storm的區(qū)別揪阿,因?yàn)閟park目前也在朝著打造一個(gè)生態(tài)圈的目標(biāo)而努力疗我,擁有spark-sql,能夠?qū)崿F(xiàn)類似Hive的數(shù)據(jù)倉(cāng)庫(kù)管理南捂;而Saprk-Streaming吴裤,則是用來(lái)進(jìn)行實(shí)時(shí)處理,類似于Storm的功能溺健;二者實(shí)現(xiàn)的功能相似麦牺,但實(shí)際上還是有些區(qū)別的。
(1)實(shí)時(shí)性來(lái)說(shuō)矿瘦,Storm的實(shí)時(shí)性更強(qiáng)枕面,基本上就是來(lái)一條數(shù)據(jù),就處理一條數(shù)據(jù)缚去;在編寫Spark代碼的時(shí)候潮秘,會(huì)發(fā)現(xiàn),其本身就是收集一段時(shí)間的數(shù)據(jù)來(lái)進(jìn)行統(tǒng)一處理易结,雖然可以盡可能縮小這個(gè)時(shí)間枕荞,但如果數(shù)據(jù)瞬間涌入過(guò)多的話柜候,其性能相比于Storm還是有些不足的。
(2)健壯性來(lái)說(shuō)躏精,Storm的實(shí)現(xiàn)中使用了zookeeper來(lái)實(shí)現(xiàn)渣刷,而且還有Ack機(jī)制,對(duì)于數(shù)據(jù)是否處理成功能夠感知到而Spark則是采取了業(yè)界常用的WAL矗烛,即預(yù)寫日志和CheckPoint機(jī)制辅柴,相比之下,健壯性要差一些
(3)并行度的適時(shí)調(diào)整:對(duì)于一個(gè)公司來(lái)說(shuō)瞭吃,業(yè)務(wù)肯定會(huì)存在高峰期和低谷期碌嘀,所以storm能夠動(dòng)態(tài)調(diào)整實(shí)時(shí)計(jì)算程序的并行度,能夠最大限度利用集群資源歪架,這點(diǎn)也很棒股冗;而Spark是實(shí)現(xiàn)不了的。
(4)但是和蚪,Spark最好的一點(diǎn)在于止状,其吞吐量比較大,而且Spark-Streaming位于Spark生態(tài)圈中攒霹,如果想要加入許多的附加功能怯疤,可以用Spark自己的組件就能夠?qū)崿F(xiàn)無(wú)縫對(duì)接,這一點(diǎn)是Storm無(wú)法相比的剔蹋,因?yàn)镾torm就是專門用于做實(shí)時(shí)處理的旅薄,其他功能的實(shí)現(xiàn),肯定性能要差一些泣崩。
二少梁、Storm核心
2.1 Storm的并發(fā)
上文提到storm是 scalable 的,是因?yàn)?storm 能將計(jì)算切分成多個(gè)獨(dú)立的 tasks 在集群上并發(fā)執(zhí)行矫付,從而支持其在多臺(tái)設(shè)備水平擴(kuò)容凯沪。那 storm 的并發(fā)是如何實(shí)現(xiàn)的呢?回答這個(gè)問題之前先來(lái)看一下 topology 是如何運(yùn)行在 storm 集群中的:
上圖中包含三個(gè)核心概念:
(1)worker: 一個(gè) worker 對(duì)應(yīng)一個(gè)進(jìn)程买优,是一個(gè) topology 的子集妨马,在 storm 集群中的一個(gè)node上可根據(jù)配置啟動(dòng)N個(gè) worker。
(2)Executor:一個(gè) executor 是運(yùn)行在一個(gè) worker 進(jìn)程上的線程杀赢,executor 可以執(zhí)行同一個(gè) spout 或 bolt 的一個(gè)或多個(gè) task 烘跺,默認(rèn)的一個(gè) executor 會(huì)分配一個(gè) task。
(3)Task:task負(fù)責(zé)真正的數(shù)據(jù)處理邏輯脂崔,一個(gè) task 實(shí)質(zhì)上是一個(gè)spout 或者 bolt 的實(shí)例滤淳。
所以,一個(gè)物理設(shè)備上可以運(yùn)行多個(gè)worker砌左,一個(gè) worker 內(nèi)部又可以啟動(dòng)多個(gè) executor 脖咐,每個(gè) executor 可以執(zhí)行一個(gè)或多個(gè)task铺敌。
Strom的并發(fā)度是用來(lái)描述所謂的 "parallelism hint",它是指一個(gè) component(spout or bolt)的初始啟動(dòng)時(shí)的 executor 數(shù)量屁擅。通過(guò)下圖我們來(lái)看一個(gè) topology 的并發(fā)示例:
上圖的topology有一個(gè) spout 和兩個(gè) bolt 組成偿凭。其中 blue spout 包含兩個(gè) executor,每個(gè) executor 各執(zhí)行一個(gè) blue spout 的 task派歌;green bolt 包含了兩個(gè) executor弯囊,每個(gè) executor 各執(zhí)行兩個(gè)task;yellow bolt 包含6個(gè) executor胶果,每個(gè) executor 各執(zhí)行一個(gè)task常挚。
整個(gè)topology啟動(dòng)了兩個(gè) worker,共包含 12 個(gè)task稽物,每個(gè)worker 包含5個(gè) executor,也就是5個(gè) Thread折欠。所以其 parallelism hint 是10贝或。
從上例可以看出,增加分配給topology的 worker 數(shù)和 executor
數(shù)是直接增加其計(jì)算能的簡(jiǎn)單辦法锐秦。Storm提供了相關(guān)的 API 或通過(guò)配置文件來(lái)修改一個(gè) topology 的 woker 數(shù)咪奖,同樣的
storm提供了相關(guān) API 控制 executor 的數(shù)量和每個(gè) executor執(zhí)行的 task 數(shù)量用以控制并發(fā)。
2.2 Stream grouping數(shù)據(jù)分組
除了spout和 bolt外酱床,定義一個(gè) topology 還有一個(gè)重要的組成羊赵,那就是 stream grouping,它規(guī)定了 topology 中的每一個(gè) bolt 實(shí)例(也即是task)要接收什么樣的 stream 作為輸入扇谣。
具體來(lái)說(shuō)昧捷,stream group定義了一個(gè) stream 中的 tuple 最終被emit 到哪個(gè) bolt task 上被處理,是一個(gè)數(shù)據(jù)分組機(jī)制罐寨。storm 提供了八種內(nèi)置的 stream grouping 類型(storm 1.o.x版本的內(nèi)置類型靡挥,):
(1)Shuffle grouping : 隨機(jī)分組,隨機(jī)的分發(fā) tuple 到每個(gè) bolt 的各個(gè) task鸯绿,每個(gè) task 接收的 tuples 數(shù)量相同跋破。
(2)Fields grouping : 按字段分組,會(huì)根據(jù) tuple 的 某一個(gè)字段(可以理解為 tuple 這個(gè) hash 的 key)分組瓶蝴,同一個(gè)字段的 tuple 永遠(yuǎn)被分配給同一個(gè) task 處理毒返。
(3)Partial Key grouping : 類似2,但實(shí)現(xiàn)了 stream 下游的兩個(gè)
(4)bolts 間的負(fù)載均衡舷手,在 tuple 的字段分布不均勻時(shí)提供了更好的資源利用效果拧簸。
(5)All grouping : 全復(fù)制分組,所有的 tuple 復(fù)制后聚霜,都會(huì)分發(fā)給所有的 bolt 的 task 進(jìn)行處理狡恬。
(6)Global grouping : 全局分組珠叔,所有的 tuples 都 emit 到唯一的一個(gè) task 上,如果為一個(gè) bolt 設(shè)置了多個(gè) task弟劲,會(huì)選擇 task id 最小的 task 來(lái)接收數(shù)據(jù)祷安,此時(shí)設(shè)置的并發(fā)是沒有意義的。
(7)None grouping : 不分組兔乞,功能上同1汇鞭,是預(yù)留接口。
(8)Direct grouping : 指定分組庸追,數(shù)據(jù)源會(huì)調(diào)用 emitDerect 方法來(lái)判斷一個(gè) tuple 將發(fā)送到哪個(gè) cosumer task 來(lái)接收這個(gè) tuple霍骄。這種分組只能在?被聲明為指向性的數(shù)據(jù)流上使用。
(9)Local or shuffle grouping : 本地隨機(jī)分組淡溯,和1類似读整,但是在隨機(jī)分組的過(guò)程中會(huì),如果在同一個(gè) woker 內(nèi)包含 consumer task咱娶,則在 woker 內(nèi)部的 consumer tasks 中進(jìn)行隨機(jī)分組米间,否則同1。
(10)另外膘侮,可以通過(guò)擴(kuò)展CustomStreamGrouping實(shí)現(xiàn)自定義的分組方式屈糊。
2.3 Storm可靠性分類
在這之前,我們需要介紹一個(gè)概念"fully processed"琼了。一條message 自從它由 spout emit 到 topology逻锐,被這個(gè) tuple 途徑的整個(gè)?DAG 中的所有 bolt 都處理過(guò),storm 認(rèn)為這個(gè) message 是被 "fully processed"雕薪。Storm 的消息保障處理機(jī)制是針對(duì) "fully processed" 而言的昧诱。
在系統(tǒng)級(jí),storm提供了 "best effort"所袁,"at least once"鳄哭,"exactly once" 三種類型。其中 "best effort" 是不保證每條消息都被處理纲熏,"at least once" 是保障消息最少能被處理一次妆丘,可能會(huì)被多次處理,"exactly once" 是保證消息被處理且只被處理一次局劲。
"best effort"這種類型沒什么可說(shuō)的勺拣,就是每條消息 storm 都會(huì)按程序邏輯走下去,但是不會(huì)關(guān)注其是否成功鱼填。"at least once"药有,是storm-core 提供的可靠性級(jí)別,即保證每條 message 至少會(huì)被處理一次,可能會(huì)出現(xiàn)多次處理的情況愤惰,下文將詳細(xì)介紹其實(shí)現(xiàn)原理苇经。
至于"exactly once"其實(shí)是由 storm 的高級(jí)抽象 Trident 實(shí)現(xiàn)的,我們會(huì)在后文對(duì)其介紹宦言。
2.4 Storm實(shí)現(xiàn)可靠性的API
現(xiàn)在扇单,我們介紹一下storm保證可靠性的實(shí)現(xiàn)接口。在 storm 中要保障消息被處理你需要做以下兩件事才能保證 spout 發(fā)出 tuple 被處理:
(1)無(wú)論在什么節(jié)點(diǎn)奠旺,每當(dāng)你新創(chuàng)建一個(gè) tuple 是都要告知 storm
(2)無(wú)論在什么節(jié)點(diǎn)蜘澜,每當(dāng)你處理完成一個(gè) tuple 都需要告知 storm
對(duì)于spout,storm的提供了非常簡(jiǎn)單的API保證可靠性:
(1)nextTuple:這個(gè)接口負(fù)責(zé)emit tuple响疚,為了保證可靠性需要為每個(gè) tuple 生成一個(gè)唯一 ID鄙信,在通過(guò) collector emit tuple 時(shí),是需要帶上這個(gè) ID忿晕。同時(shí)會(huì)將這個(gè) tuple 和 ID 保存在一個(gè) hash 中装诡,以等待 tuple 被完全處理后相應(yīng)的操作.
(2)ack:這個(gè)接口負(fù)責(zé)處理成功的應(yīng)答,一般當(dāng)收到成功處理這個(gè)tuple 的消息后践盼,刪除 hash 中這個(gè) tuple 的記錄慎王。
(3)fail: 這個(gè)接口復(fù)雜處理失敗的應(yīng)答,當(dāng)某個(gè) tuple 處理失敗而超時(shí)后會(huì)調(diào)用這個(gè)接口宏侍,一般選擇重新 emit 這條消息。
2.5 Storm高效實(shí)現(xiàn)可靠性的原理
在storm中有這樣一個(gè)special "acker" tasks蜀漆,它負(fù)責(zé)跟蹤所有由spout 發(fā)出的 tuple?產(chǎn)生的 DAG谅河。當(dāng)一個(gè) tuple 成功的在 DAG
中完成整個(gè)生命周期,這個(gè)task會(huì)通知 emit 這個(gè) tuple 的 spout task 這個(gè) tuple 被處理了确丢。所以如果期望消息至少被處理一次绷耍,最少要啟動(dòng)一個(gè) acker task,當(dāng)然你可以啟動(dòng)任意個(gè)鲜侥。
Storm會(huì)通過(guò) "mod hashing" 的方法將一個(gè) tuple 分配到合適的acker 去跟蹤褂始,因?yàn)槊恳粋€(gè) tuple 都對(duì)應(yīng)一個(gè)64位的唯一ID,并且在錨定 tuple 時(shí)這個(gè)ID也會(huì)隨之傳給新生成的 tuple描函,所以 DAG 中的每個(gè)節(jié)點(diǎn)根據(jù)這個(gè) ID 可以判斷應(yīng)答消息發(fā)送給哪個(gè) acker崎苗。同樣 acker 也能從在應(yīng)答消息中確認(rèn)哪個(gè) tuple 的狀態(tài)被更新了,當(dāng)一個(gè) tuple 的整個(gè) DAG 完成舀寓,acker 會(huì)發(fā)送確認(rèn)消息給源 spout胆数。
Acker不會(huì)明確的追蹤整個(gè) DAG,否則當(dāng) DAG 越發(fā)復(fù)雜時(shí)其負(fù)擔(dān)越重互墓。Acker 的追蹤算法非常之簡(jiǎn)潔高效必尼,并且只對(duì)于每個(gè)追蹤的tuple 只會(huì)占用大約20B的固定空間。
Storm會(huì)在系統(tǒng)中維護(hù)一個(gè)表,這個(gè)表的 key 是 acker 追蹤的每個(gè) tuple 的 ID判莉,value 的初始值也是這個(gè) ID豆挽。當(dāng) DAG 中的下游節(jié)點(diǎn)處理了這個(gè) tuple 后,acker 接到確認(rèn)信息后會(huì)做一個(gè) XOR 運(yùn)算券盅,用 XOR 的運(yùn)算結(jié)果來(lái)更新這個(gè) ID 在表中對(duì)應(yīng)的 val帮哈。
在這里需要說(shuō)明一下在DAG中每個(gè)新生成 tuple 都會(huì)有一個(gè)64位的隨機(jī)值ID(注意:不是其錨定的tuple傳來(lái)的spout emit的那個(gè)tuple 的ID。也就是說(shuō)每個(gè)新生成的 tuple 會(huì)有一個(gè)唯一 ID渗饮,新生成的 tuple 錨定某一個(gè) tuple 后也會(huì)知曉 spout tuple 的那個(gè) ID)但汞,在每個(gè)計(jì)算節(jié)點(diǎn),storm 會(huì)將這個(gè)計(jì)算節(jié)點(diǎn)生成的所有 tuple 的 ID 與所有輸入 tuple 的 ID 以及這個(gè) DAG 所追蹤的 tuple 在系統(tǒng)表中對(duì)應(yīng)的 value 做 XOR 操作互站,得到一個(gè)結(jié)果私蕾,并用這個(gè)結(jié)果更新系統(tǒng)表中對(duì)應(yīng)的 value。
2.6 Storm在各種失敗場(chǎng)景下的保障方法
情景1:DAG 中某個(gè)節(jié)點(diǎn)掛掉沒有正常發(fā)送 fail msg胡桃。這時(shí)其對(duì)應(yīng)的根節(jié)點(diǎn)的 tuple 最后會(huì)因超時(shí)而被 spout 重發(fā)踩叭。
情景2:跟蹤 tuple 的 acker task 掛了。此時(shí)翠胰,這個(gè)acker跟蹤的所有task都會(huì)因?yàn)槌瑫r(shí)而重發(fā)(因?yàn)?acker 不會(huì)更新其在系統(tǒng)中對(duì)應(yīng)的value)容贝。
情景3:spout 掛了。因?yàn)閟pout的輸入往往來(lái)自隊(duì)列之景,當(dāng) spout 掛掉后斤富,這個(gè) spout 沒有對(duì)隊(duì)列中的消息做確認(rèn)回應(yīng),所以隊(duì)列不會(huì)認(rèn)為這個(gè) spout 提走的數(shù)據(jù)被正常消費(fèi)了锻狗,而作"出隊(duì)"處理(其實(shí)是將執(zhí)行中并沒有確認(rèn)的數(shù)據(jù)重新歸隊(duì))满力。