Strom入門系列之二:storm簡單應(yīng)用實例
上一篇文章概要的介紹的 storm 的一些知識垒手,以及相關(guān)工作原理丢烘。本文將介紹本人在實際工作中實現(xiàn)的一個 Storm Topology。
需求
先簡要介紹一下業(yè)務(wù)場景——監(jiān)控系統(tǒng)
監(jiān)控系統(tǒng)架構(gòu)簡述
之前在一家 CDN 公司做監(jiān)控系統(tǒng)的開發(fā)甥雕,整個監(jiān)控系統(tǒng)負(fù)責(zé)做全網(wǎng)15000+設(shè)備的監(jiān)控剩晴、CDN相關(guān)服務(wù)的監(jiān)控以及其他業(yè)務(wù)的一些監(jiān)控较木。整個監(jiān)控系統(tǒng)結(jié)構(gòu)大概如下圖:
在這個監(jiān)控系統(tǒng)中,kafka 負(fù)責(zé)收集所有 agent 上報的消息蔚龙,storm 消費 kafka冰评。Storm 的一個 topology 負(fù)責(zé)將所有的消息轉(zhuǎn)存到 redis 用以告警,同時還將消息轉(zhuǎn)存的 opentsdb 時間序列數(shù)據(jù)庫中用以展示數(shù)據(jù)序列木羹。
業(yè)務(wù)數(shù)據(jù)聚合監(jiān)控
在上述結(jié)構(gòu)中甲雅,對于某一項指標(biāo)在某一臺設(shè)備上的告警需求基本能全面覆蓋,例如對設(shè)備的 Memory 的監(jiān)控坑填,agent 會定時上報設(shè)備的 memory 信息抛人,storm 轉(zhuǎn)存 redis 后,alarm 組件會定時的讀取redis 中的數(shù)據(jù)判端相關(guān)監(jiān)控項是否正常脐瑰,這種監(jiān)控我們稱之為設(shè)備監(jiān)控妖枚。
然而在更多的情況下這種單一設(shè)備維度的數(shù)據(jù)往往不能反應(yīng)整個業(yè)務(wù)的服務(wù)狀態(tài),比如苍在,對于某一個網(wǎng)站 "www.test.com" 的加速绝页,通常由一個集群下的 N 個設(shè)備的 nginx 實現(xiàn)荠商,而 nginx 日志中訪問 "www.test.com" 的狀態(tài)碼是監(jiān)控服務(wù)質(zhì)量的一個重要指標(biāo)。其中單臺設(shè)備的 nginx 的 5xx抒寂,4xx 狀態(tài)碼數(shù)量和占比往往能反正這臺設(shè)備是否服務(wù)正常结啼,這個屬于設(shè)備監(jiān)控;但是屈芜,另一方面郊愧,某一設(shè)備的服務(wù)異常在整體上并不能反映集群的服務(wù)質(zhì)量,整個集群的服務(wù)狀態(tài)數(shù)據(jù)是需要聚合整個集群的實時數(shù)據(jù)計算的井佑,這是設(shè)備監(jiān)控所不能滿足的需求属铁。
在這里我們使用 storm 聚合數(shù)據(jù),以頻道(網(wǎng)站名稱)為中心計算其在整個加速平臺中的 5xx躬翁,4xx 狀態(tài)碼的占比和數(shù)量并監(jiān)控焦蘑,從整體上可以把控所有加速服務(wù)在這一指標(biāo)上是否異常。
我們又實現(xiàn)了一個 topology 用于計算這種有數(shù)據(jù)聚合需求的監(jiān)控盒发,下面來看一下這個 topology 的實現(xiàn)例嘱。
設(shè)計
kafka消息格式
在討論 storm topology 之前先聊聊 kafka 中的數(shù)據(jù)格式。因為CDN 服務(wù)的的核心是 nginx宁舰,很多服務(wù)和業(yè)務(wù)的核心數(shù)據(jù)是直接可以從 log 獲取的拼卵,所以理想情況下能獲得原始 log 然后各種分析是最理想的。但是考慮到全網(wǎng) 15000+ 的設(shè)備蛮艰,在峰值時刻的帶寬成本及其昂貴腋腮,所以我們采取了一下折中,通過部署在設(shè)備上的agent 實時跟蹤 nginx 的日志流壤蚜,將每條日志進(jìn)行簡要簡析統(tǒng)計即寡,將統(tǒng)計結(jié)果每 5min 上報的 kafka,這樣基本解決的帶寬成本(但是會帶來 5min 的數(shù)據(jù)延時袜刷,可以根據(jù)不同的需求等級變更上報間隔)聪富。上報的數(shù)據(jù)格式為:
Spout設(shè)計
定義清楚消息格式后,可以開始設(shè)計 topology著蟹。Spout 這里比較簡單墩蔓,就是一個 kafka consumer 負(fù)責(zé)從 kafka 消費數(shù)據(jù)。目前草则,官方有提供現(xiàn)成的輪子 storm-kafka 包直接包含從 kafka 中讀取數(shù)據(jù)的 KafkaSpout钢拧,官方版的直接應(yīng)用 kafka 提供的 jave simple consumer 實現(xiàn)。
我們這里沒有使用官方的包炕横,是使用的 kafka 提供的高級 API 自建的 consumer源内。因為通過高級的 API 提供的 offset 的可以很方便的觀察 kafka各個 partition 的消費情況,便于維護(hù)排障,而且當(dāng)時而且官方的只支持 kafka-0.8.x 版本(使用中遇到過幾個bug膜钓,不過后續(xù)修復(fù)了嗽交,現(xiàn)在也支持更高的版本了,推薦使用官方颂斜,否則要自己實現(xiàn)對kafka consume 的 ack)夫壁。
Bolt 設(shè)計
kafka 的每條消息是某一臺設(shè)備近 5min 的被訪問的所有頻道的所有狀態(tài)碼的 count,也就是說以設(shè)備為中心沃疮,但需求是以頻道為中心盒让,所以在接到 spout 的 tuple 的第一個 bolt ——SplitBolt——需要做的就是將 tuple 的數(shù)據(jù) split 成多條以頻道為中心的 tuples,并將這些 tuples 通過 storm 的 fieldsGrouping 方式將 tuple 分組司蔬,保證頻道相同的數(shù)據(jù)都落在后續(xù)的同一個 bolt 實例上邑茄。
第二個 bolt——ComputeBolt——收到頻道為中心的數(shù)據(jù)后,在內(nèi)存中通過map記錄兩類數(shù)據(jù):
- 每個頻道在這一段時間內(nèi) 2xx俊啼,4xx肺缕,5xx 等狀態(tài)碼的總量
- 每個頻道每個狀態(tài)碼的全網(wǎng)設(shè)備中 TopN 設(shè)備的詳細(xì)數(shù)量和占比。
然后這個bolt 會定時(由于數(shù)據(jù)流式密集的授帕,所以這個bolt自身維護(hù)一個時間戳同木,每次處理新的tuple的后根據(jù)設(shè)備時間判斷是否超過5min,超過則像后續(xù)的bolt 發(fā)送數(shù)據(jù)跛十,否則繼續(xù)處理下一個tuple)的將統(tǒng)計結(jié)果分別提交給后續(xù)的 bolt 做存儲和告警彤路。這個bolt 的定時周期要和 agent 的上報周期相同,用以保證 storm 這里每一個匯集周期在正常情況下只會收到一臺設(shè)備的一條數(shù)據(jù)偶器。
另外三個 bolt——HbaseBolt斩萌,OpenTSDBBolt缝裤,TransferBolt——分別用于保存每 5min 的統(tǒng)計結(jié)果到對應(yīng)的存儲或服務(wù)屏轰,HbaseBolt 保存頻道在全網(wǎng) topN 設(shè)備的數(shù)據(jù);OpenTSDBBolt 保存這個時間點頻道的各個狀態(tài)碼總量和比例憋飞;TransferBolt負(fù)責(zé)將數(shù)據(jù)發(fā)送給告警服務(wù)(使用的小米開源的 Open-falcon 部分組件)霎苗。
這樣,這個 topology 就初步完成了榛做,從實際結(jié)果來看基本滿足業(yè)務(wù)需求唁盏。下面來一下簡要的代碼。
實現(xiàn)
整個topology的結(jié)構(gòu):
(全部代碼參見附錄)
優(yōu)化反思
數(shù)據(jù)延時
在設(shè)計那一小節(jié)中我們說道:
這個 bolt 的定時周期要和 agent 的上報周期相同检眯,用以保證 storm 這里每一個匯集周期在正常情況下只會收到一臺設(shè)備的一條數(shù)據(jù)厘擂。
可以注意到這是一種理想情況。在生產(chǎn)環(huán)境中锰瘸,往往因為網(wǎng)絡(luò)情況機(jī)器負(fù)載等情況出現(xiàn)一些延時刽严,這樣在 ComputeBolt 的一個處理周期(5min)內(nèi)可能會處理一臺設(shè)備的兩條甚至多條數(shù)據(jù),這會一定程度上影響聚合統(tǒng)計結(jié)果的準(zhǔn)確性避凝,但是可以接受的舞萄。
首先這不會在總體上影響數(shù)據(jù)一的數(shù)量變化眨补,其次,因為在數(shù)據(jù)傳輸?shù)难訒r是 storm 無法保證的倒脓,少量的延時不會影響整個頻道整體的數(shù)據(jù)趨勢的撑螺,從實踐效果來看基本不影響統(tǒng)計和告警,而大量的延時往往意味著服務(wù)集群整體故障崎弃、storm 集群集體故障亦或網(wǎng)絡(luò)故障等甘晤,這種情況是很快會被設(shè)備告警發(fā)現(xiàn)的,所以不會影響告警饲做。
但是在這個 topology 的設(shè)計中安皱,計時是以數(shù)據(jù)在 storm 處理過程中的系統(tǒng)時間作為數(shù)據(jù)生成時間的,而不是使用數(shù)據(jù)自帶的時間戳艇炎,所以數(shù)據(jù)堆積會影響統(tǒng)計結(jié)果酌伊,導(dǎo)致在某一小段時間內(nèi)數(shù)據(jù)集中,不均勻缀踪,不過以目前的需求來說是可以接受的居砖,如果希望嚴(yán)格分散則需要使用數(shù)據(jù)自帶的時間戳,重新設(shè)計topology驴娃。
時間窗口(已更新1.0.x正式版新特性)
在目前的機(jī)制中奏候,Computebolt 是在計算完每一個 tuple 后查看系統(tǒng)時間,和 compute 內(nèi)存中記錄的上次發(fā)送時間做對比唇敞,如果超過 5min蔗草,則將統(tǒng)計結(jié)果 emit 到后面的 bolts 中。
很顯然疆柔,這種設(shè)計方法會存在一定的誤差咒精,在數(shù)據(jù)密集時,這種誤差很小旷档,以線上的生產(chǎn)環(huán)境來看15000+設(shè)備模叙,每 5min 發(fā)送一次數(shù)據(jù),平均每秒50條數(shù)據(jù)鞋屈,誤差很蟹蹲伞(根據(jù)觀察在1s以內(nèi))。
但是數(shù)據(jù)稀疏時則會有較大幾率產(chǎn)生高誤差厂庇,比如當(dāng)兩條數(shù)據(jù)間隔超過 N 秒后渠啊,又恰巧這 N 秒內(nèi)達(dá)到 5min 發(fā)送閾值,則可能會產(chǎn)生最大 N 秒的時間偏差,從而導(dǎo)致誤差,或者數(shù)據(jù)缺失徒欣。
現(xiàn)在時間窗口在統(tǒng)計方面是一種很常見的需求锋玲,在 storm 的官方examples 中給出了一種解決方法來時間這種滑動窗口:新增一個spout灭返,它每隔 5min 向 ComputeBolt 發(fā)送一個無意義的 tuple盗迟,使用 all-grouping 方式,保證每隔 ComputeBolt 都會接收到這個tuple熙含,從而 ComputeBolt 根據(jù) tuple 的來源判斷是有意的統(tǒng)計數(shù)據(jù)還是一個 emit 統(tǒng)計結(jié)果的 flag tuple罚缕。
不過我個人認(rèn)為這里可能仍然會出現(xiàn)問題,因為 storm 是保證最少處理一次怎静,所以可能會發(fā)生重發(fā)這個無意義 tuple 的情景邮弹,這樣可能出出現(xiàn)一些意外~
在storm發(fā)布1.0.x正式版后,新增的時間窗口功能蚓聘,可以很方便的實現(xiàn)這種統(tǒng)計結(jié)果 Sliding Window 和 Tumbling Window 兩種接口腌乡,可以根據(jù)接口靈活選擇(相關(guān)文檔參見附錄)。
ps: 在后文會有一片關(guān)于window的使用介紹夜牡。
動態(tài)配置
在 topology 有可能會出現(xiàn)另一種需求——不重啟 topology与纽,動態(tài)修改配置。其實這個方法的實現(xiàn)思路類似上文塘装,是使用一個新的spout急迂,這個 spout 是個 http-server 或 http-client,其可以接受或者定期拉取各種配置蹦肴,將配置發(fā)送給 bolt僚碎,同時 bolt 根據(jù) tuple 來源確定這是一個配置 tuple,用以更新自身配置阴幌。
小結(jié)
本文簡要的實現(xiàn)一個 topology勺阐,比較簡單,可能還有坑矛双。渊抽。。僅供參考背零。
Ps :目前已經(jīng)離職了腰吟,這里的優(yōu)化其中一些在其他 topology 中使用了无埃,但是這個 topology 一直沒空改徙瓶。。嫉称。github上就是沒改過的侦镇。。织阅。等有時間慢慢改吧壳繁。。。