一风范、Storm是什么
Storm是一個免費并開源的分布式實時計算系統(tǒng)。利用Storm可以很容易做到可靠地處理無限的數(shù)據(jù)流刻盐,像Hadoop批量處理大數(shù)據(jù)一樣诸尽,Storm可以實時處理數(shù)據(jù)。Storm簡單挨决,可以使用任何編程語言请祖。
在Storm之前,進行實時處理是非常痛苦的事情: 需要維護一堆消息隊列和消費者脖祈,他們構(gòu)成了非常復雜的圖結(jié)構(gòu)肆捕。消費者進程從隊列里取消息,處理完成后盖高,去更新數(shù)據(jù)庫慎陵,或者給其他隊列發(fā)新消息眼虱。
這樣進行實時處理是非常痛苦的。我們主要的時間都花在關(guān)注往哪里發(fā)消息席纽,從哪里接收消息捏悬,消息如何序列化,真正的業(yè)務(wù)邏輯只占了源代碼的一小部分胆筒。一個應(yīng)用程序的邏輯運行在很多worker上邮破,但這些worker需要各自單獨部署,還需要部署消息隊列仆救。最大問題是系統(tǒng)很脆弱抒和,而且不是容錯的:需要自己保證消息隊列和worker進程工作正常。
Storm完整地解決了這些問題彤蔽。它是為分布式場景而生的摧莽,抽象了消息傳遞,會自動地在集群機器上并發(fā)地處理流式計算顿痪,讓你專注于實時處理的業(yè)務(wù)邏輯镊辕。
二、Storm的特點
Storm有如下特點:
- 編程簡單:開發(fā)人員只需要關(guān)注應(yīng)用邏輯蚁袭,而且跟Hadoop類似征懈,Storm提供的編程原語也很簡單
- 高性能,低延遲:可以應(yīng)用于廣告搜索引擎這種要求對廣告主的操作進行實時響應(yīng)的場景揩悄。
- 分布式:可以輕松應(yīng)對數(shù)據(jù)量大卖哎,單機搞不定的場景
- 可擴展: 隨著業(yè)務(wù)發(fā)展,數(shù)據(jù)量和計算量越來越大删性,系統(tǒng)可水平擴展
- 容錯:單個節(jié)點掛了不影響應(yīng)用
- 消息不丟失:保證消息處理
不過Storm不是一個完整的解決方案亏娜。使用Storm時你需要關(guān)注以下幾點: - 如果使用的是自己的消息隊列蕴坪,需要加入消息隊列做數(shù)據(jù)的來源和產(chǎn)出的代碼
- 需要考慮如何做故障處理:如何記錄消息隊列處理的進度搜锰,應(yīng)對Storm重啟,掛掉的場景
- 需要考慮如何做消息的回退:如果某些消息處理一直失敗怎么辦彬檀?
三巴帮、Storm的應(yīng)用
跟Hadoop不一樣溯泣,Storm是沒有包括任何存儲概念的計算系統(tǒng)。這就讓Storm可以用在多種不同的場景下:非傳統(tǒng)場景下數(shù)據(jù)動態(tài)到達或者數(shù)據(jù)存儲在數(shù)據(jù)庫這樣的存儲系統(tǒng)里(或數(shù)據(jù)是被實時操控其他設(shè)備的控制器(如交易系統(tǒng))所消費)
Storm有很多應(yīng)用:實時分析榕茧,在線機器學習(online machine learning)发乔,連續(xù)計算(continuous computation),分布式遠程過程調(diào)用(RPC)雪猪、ETL等。Storm處理速度很快:每個節(jié)點每秒鐘可以處理超過百萬的數(shù)據(jù)組起愈。它是可擴展(scalable)只恨,容錯(fault-tolerant)译仗,保證你的數(shù)據(jù)會被處理,并且很容易搭建和操作官觅。
例如Nathan Marz提供的例子纵菌,產(chǎn)生Twitter的趨勢信息。Twitter從海量推文中抽取趨勢信息休涤,并在本地區(qū)域和國家層級進行維護咱圆。這意味者一旦一個案例開始出現(xiàn),Twitter的話題趨勢算法就能實時的鑒別出這個話題功氨。這個實時的算法就是通過在Storm上連續(xù)分析Twitter數(shù)據(jù)來實現(xiàn)的序苏。
三、Storm模型
Storm實現(xiàn)了一個數(shù)據(jù)流(data flow)的模型捷凄,在這個模型中數(shù)據(jù)持續(xù)不斷地流經(jīng)一個由很多轉(zhuǎn)換實體構(gòu)成的網(wǎng)絡(luò)忱详。一個數(shù)據(jù)流的抽象叫做流(stream),流是無限的元組(Tuple)序列跺涤。元組就像一個可以表示標準數(shù)據(jù)類型(例如int匈睁,float和byte數(shù)組)和用戶自定義類型(需要額外序列化代碼的)的數(shù)據(jù)結(jié)構(gòu)。每個流由一個唯一的ID來標示的桶错,這個ID可以用來構(gòu)建拓撲中各個組件的數(shù)據(jù)源航唆。
如下圖所示,其中的水龍頭代表了數(shù)據(jù)流的來源院刁,一旦水龍頭打開糯钙,數(shù)據(jù)就會源源不斷地流經(jīng)Bolt而被處理。圖中有三個流黎比,用不同的顏色來表示超营,每個數(shù)據(jù)流中流動的是元組(Tuple),它承載了具體的數(shù)據(jù)阅虫。元組通過流經(jīng)不同的轉(zhuǎn)換實體而被處理演闭。
Storm對數(shù)據(jù)輸入的來源和輸出數(shù)據(jù)的去向沒有做任何限制。像Hadoop颓帝,是需要把數(shù)據(jù)放到自己的文件系統(tǒng)HDFS里的米碰。在Storm里,可以使用任意來源的數(shù)據(jù)輸入和任意的數(shù)據(jù)輸出购城,只要你實現(xiàn)對應(yīng)的代碼來獲取/寫入這些數(shù)據(jù)就可以吕座。典型場景下,輸入/輸出數(shù)據(jù)來是基于類似Kafka或者ActiveMQ這樣的消息隊列瘪板,但是數(shù)據(jù)庫吴趴,文件系統(tǒng)或者web服務(wù)也都是可以的。
四侮攀、概念
1. 拓撲(Topologies)
一個Storm拓撲打包了一個實時處理程序的邏輯锣枝。一個Storm拓撲跟一個MapReduce的任務(wù)(job)是類似的厢拭。主要區(qū)別是MapReduce任務(wù)最終會結(jié)束,而拓撲會一直運行(當然直到你殺死它)撇叁。一個拓撲是一個通過流分組(stream grouping)把Spout和Bolt連接到一起的拓撲結(jié)構(gòu)供鸠。圖的每條邊代表一個Bolt訂閱了其他Spout或者Bolt的輸出流。一個拓撲就是一個復雜的多階段的流計算陨闹。
2. 元組(Tuple)
元組是Storm提供的一個輕量級的數(shù)據(jù)格式楞捂,可以用來包裝你需要實際處理的數(shù)據(jù)。元組是一次消息傳遞的基本單元趋厉。一個元組是一個命名的值列表寨闹,其中的每個值都可以是任意類型的。元組是動態(tài)地進行類型轉(zhuǎn)化的--字段的類型不需要事先聲明觅廓。在Storm中編程時鼻忠,就是在操作和轉(zhuǎn)換由元組組成的流。通常杈绸,元組包含整數(shù)帖蔓,字節(jié),字符串瞳脓,浮點數(shù)塑娇,布爾值和字節(jié)數(shù)組等類型。要想在元組中使用自定義類型劫侧,就需要實現(xiàn)自己的序列化方式埋酬。
3. 流(Streams)
流是Storm中的核心抽象。一個流由無限的元組序列組成烧栋,這些元組會被分布式并行地創(chuàng)建和處理写妥。通過流中元組包含的字段名稱來定義這個流。每個流聲明時都被賦予了一個ID审姓。只有一個流的Spout和Bolt非常常見珍特,所以OutputFieldsDeclarer
提供了不需要指定ID來聲明一個流的函數(shù)(Spout和Bolt都需要聲明輸出的流)。這種情況下魔吐,流的ID是默認的“default”扎筒。
4. Spouts
Spout(噴嘴,這個名字很形象)是Storm中流的來源酬姆。通常Spout從外部數(shù)據(jù)源嗜桌,如消息隊列中讀取元組數(shù)據(jù)并吐到拓撲里。Spout可以是可靠的(reliable)或者不可靠(unreliable)的辞色」浅瑁可靠的Spout能夠在一個元組被Storm處理失敗時重新進行處理,而非可靠的Spout只是吐數(shù)據(jù)到拓撲里,不關(guān)心處理成功還是失敗了层亿。
Spout可以一次給多個流吐數(shù)據(jù)壶唤。此時需要通過OutputFieldsDeclarer的declareStream函數(shù)來聲明多個流并在調(diào)用SpoutOutputCollector提供的emit方法時指定元組吐給哪個流。
Spout中最主要的函數(shù)是nextTuple棕所,Storm框架會不斷調(diào)用它去做元組的輪詢。如果沒有新的元組過來悯辙,就直接返回琳省,否則把新元組吐到拓撲里。nextTuple必須是非阻塞的躲撰,因為Storm在同一個線程里執(zhí)行Spout的函數(shù)针贬。
Spout中另外兩個主要的函數(shù)是ack和fail。當Storm檢測到一個從Spout吐出的元組在拓撲中成功處理完時調(diào)用ack,沒有成功處理完時調(diào)用fail拢蛋。只有可靠型的Spout會調(diào)用ack和fail函數(shù)桦他。更多細節(jié)可以查看Storm Java文檔和我的另外一篇文章:Storm如何保證可靠的消息處理
5. Bolts
在拓撲中所有的計算邏輯都是在Bolt中實現(xiàn)的。一個Bolt可以處理任意數(shù)量的輸入流谆棱,產(chǎn)生任意數(shù)量新的輸出流快压。Bolt可以做函數(shù)處理,過濾垃瞧,流的合并蔫劣,聚合,存儲到數(shù)據(jù)庫等操作个从。Bolt就是流水線上的一個處理單元脉幢,把數(shù)據(jù)的計算處理過程合理的拆分到多個Bolt、合理設(shè)置Bolt的task數(shù)量嗦锐,能夠提高Bolt的處理能力嫌松,提升流水線的并發(fā)度。
Bolt可以給多個流吐出元組數(shù)據(jù)奕污。此時需要使用OutputFieldsDeclarer的declareStream方法來聲明多個流并在使用OutputColletor的emit方法時指定給哪個流吐數(shù)據(jù)萎羔。
當你聲明了一個Bolt的輸入流,也就訂閱了另外一個組件的某個特定的輸出流菊值。如果希望訂閱另一個組件的所有流外驱,需要單獨挨個訂閱。InputDeclarer有語法糖來訂閱ID為默認值的流腻窒。例如declarer.shuffleGrouping("redBolt")訂閱了redBolt組件上的默認流昵宇,跟declarer.shuffleGrouping("redBolt", DEFAULT_STREAM_ID)是相同的。
在Bolt中最主要的函數(shù)是execute函數(shù)儿子,它使用一個新的元組當作輸入瓦哎。Bolt使用OutputCollector對象來吐出新的元組。Bolts必須為處理的每個元組調(diào)用OutputCollector的ack方法以便于Storm知道元組什么時候被各個Bolt處理完了(最終就可以確認Spout吐出的某個元組處理完了)。通常處理一個輸入的元組時蒋譬,會基于這個元組吐出零個或者多個元組割岛,然后確認(ack)輸入的元組處理完了,Storm提供了IBasicBolt接口來自動完成確認犯助。
必須注意OutputCollector不是線程安全的癣漆,所以所有的吐數(shù)據(jù)(emit)、確認(ack)剂买、通知失敗(fail)必須發(fā)生在同一個線程里惠爽。更多信息可以參照問題定位。
6. 任務(wù)(Tasks)
每個Spout和Bolt會以多個任務(wù)(Task)的形式在集群上運行瞬哼。每個任務(wù)對應(yīng)一個執(zhí)行線程婚肆,流分組定義了如何從一組任務(wù)(同一個Bolt)發(fā)送元組到另外一組任務(wù)(另外一個Bolt)上∽浚可以在調(diào)用TopologyBuilder的setSpout和setBolt函數(shù)時設(shè)置每個Spout和Bolt的并發(fā)數(shù)较性。
7. 組件(Component)
組件(component)是對Bolt和Spout的統(tǒng)稱
8. 流分組(Stream Grouping)
定義拓撲的時候,一部分工作是指定每個Bolt應(yīng)該消費哪些流结胀。流分組定義了一個流在一個消費它的Bolt內(nèi)的多個任務(wù)(task)之間如何分組赞咙。流分組跟計算機網(wǎng)絡(luò)中的路由功能是類似的,決定了每個元組在拓撲中的處理路線把跨。
在Storm中有七個內(nèi)置的流分組策略人弓,你也可以通過實現(xiàn)CustomStreamGrouping接口來自定義一個流分組策略:
1. 洗牌分組(Shuffle grouping): 隨機分配元組到Bolt的某個任務(wù)上,這樣保證同一個Bolt的每個任務(wù)都能夠得到相同數(shù)量的元組着逐。
2. 字段分組(Fields grouping): 按照指定的分組字段來進行流的分組崔赌。例如,流是用字段“user-id"來分組的耸别,那有著相同“user-id"的元組就會分到同一個任務(wù)里健芭,但是有不同“user-id"的元組就會分到不同的任務(wù)里。這是一種非常重要的分組方式秀姐,通過這種流分組方式慈迈,我們就可以做到讓Storm產(chǎn)出的消息在這個"user-id"級別是嚴格有序的,這對一些對時序敏感的應(yīng)用(例如省有,計費系統(tǒng))是非常重要的痒留。
3. Partial Key grouping: 跟字段分組一樣,流也是用指定的分組字段進行分組的蠢沿,但是在多個下游Bolt之間是有負載均衡的伸头,這樣當輸入數(shù)據(jù)有傾斜時可以更好的利用資源。這篇論文很好的解釋了這是如何工作的舷蟀,有哪些優(yōu)勢恤磷。
4. All grouping: 流會復制給Bolt的所有任務(wù)面哼。小心使用這種分組方式。在拓撲中扫步,如果希望某類元祖發(fā)送到所有的下游消費者魔策,就可以使用這種All grouping的流分組策略。
5. Global grouping: 整個流會分配給Bolt的一個任務(wù)河胎。具體一點闯袒,會分配給有最小ID的任務(wù)。
6. 不分組(None grouping): 說明不關(guān)心流是如何分組的游岳。目前搁吓,None grouping等價于洗牌分組。
7. Direct grouping:一種特殊的分組吭历。對于這樣分組的流,元組的生產(chǎn)者決定消費者的哪個任務(wù)會接收處理這個元組擂橘。只能在聲明做直連的流(direct streams)上聲明Direct groupings分組方式晌区。只能通過使用emitDirect系列函數(shù)來吐元組給直連流。一個Bolt可以通過提供的TopologyContext來獲得消費者的任務(wù)ID通贞,也可以通過OutputCollector對象的emit函數(shù)(會返回元組被發(fā)送到的任務(wù)的ID)來跟蹤消費者的任務(wù)ID朗若。在ack的實現(xiàn)中,Spout有兩個直連輸入流昌罩,ack和ackFail哭懈,使用了這種直連分組的方式。
8. Local or shuffle grouping:如果目標Bolt在同一個worker進程里有一個或多個任務(wù)茎用,元組就會通過洗牌的方式分配到這些同一個進程內(nèi)的任務(wù)里遣总。否則,就跟普通的洗牌分組一樣轨功。這種方式的好處是可以提高拓撲的處理效率旭斥,因為worker內(nèi)部通信就是進程內(nèi)部通信了,相比拓撲間的進程間通信要高效的多古涧。worker進程間通信是通過使用Netty來進行網(wǎng)絡(luò)通信的垂券。
9. 可靠性(Reliability)
Storm保證了拓撲中Spout產(chǎn)生的每個元組都會被處理。Storm是通過跟蹤每個Spout所產(chǎn)生的所有元組構(gòu)成的樹形結(jié)構(gòu)并得知這棵樹何時被完整地處理來達到可靠性羡滑。每個拓撲對這些樹形結(jié)構(gòu)都有一個關(guān)聯(lián)的“消息超時”菇爪。如果在這個超時時間里Storm檢測到Spout產(chǎn)生的一個元組沒有被成功處理完,那Sput的這個元組就處理失敗了柒昏,后續(xù)會重新處理一遍凳宙。
為了發(fā)揮Storm的可靠性,需要你在創(chuàng)建一個元組樹中的一條邊時告訴Storm昙楚,也需要在處理完每個元組之后告訴Storm近速。這些都是通過Bolt吐元組數(shù)據(jù)用的OutputCollector對象來完成的诈嘿。標記是在emit函數(shù)里完成,完成一個元組后需要使用ack函數(shù)來告訴Storm削葱。
這些都在“保證消息處理”一文中會有更詳細的介紹奖亚。
10. Workers(工作進程)
拓撲以一個或多個Worker進程的方式運行。每個Worker進程是一個物理的Java虛擬機析砸,執(zhí)行拓撲的一部分任務(wù)昔字。例如,如果拓撲的并發(fā)設(shè)置成了300首繁,分配了50個Worker作郭,那么每個Worker執(zhí)行6個任務(wù)(作為Worker內(nèi)部的線程)。Storm會盡量把所有的任務(wù)均分到所有的Worker上弦疮。
五夹攒、Storm中用到的技術(shù)
ZeroMQ 提供了可擴展環(huán)境下的傳輸層高效消息通信,一開始Storm的內(nèi)部通信使用的是ZeroMQ胁塞,后來作者想把Storm移交給Apache開源基金會來管理咏尝,而ZeroMQ的許可證書跟Apache基金會的政策有沖突。在Storm中啸罢,Netty比ZeroMQ更加高效编检,而且提供了worker間通信時的驗證機制,所以在Storm0.9中扰才,就改用了Netty允懂。
Clojure Storm系統(tǒng)的實現(xiàn)語言。Clojure是由Rich Hicky作為一種通用語言發(fā)明的衩匣,它衍生自Lisp語言蕾总,簡化了多線程編程。
**Apache ZooKeeper **Zookeeper是一個實現(xiàn)高可靠的分布式協(xié)作的開源項目琅捏。Storm使用Zookeeper來協(xié)調(diào)集群中的多個節(jié)點谤专。