之前一直在團(tuán)隊(duì)中接觸監(jiān)控平臺(tái),并沒有做相關(guān)的業(yè)務(wù)項(xiàng)目,經(jīng)過這一段時(shí)間以來此衅,對流式計(jì)算有了一個(gè)大體的認(rèn)識,之后的深入學(xué)習(xí)還要繼續(xù)亭螟,今天算是系統(tǒng)的將這個(gè)項(xiàng)目整理了一遍挡鞍。
這個(gè)工程各個(gè)團(tuán)隊(duì)公司都會(huì)或多或少的接觸過。主要是將團(tuán)隊(duì)中所有業(yè)務(wù)日志經(jīng)storm集群流式計(jì)算预烙,最后在平臺(tái)展現(xiàn)一些重要的監(jiān)控指標(biāo)墨微,例如異常率,執(zhí)行次數(shù)扁掸,平均執(zhí)行時(shí)間等等翘县。最后將這些指標(biāo)數(shù)據(jù)在web端進(jìn)行展示(主要是利用SpringBoot+Vue.js)。目前storm主流的有兩個(gè)谴分,一個(gè)是國外的锈麸,另一個(gè)是阿里自研的,這里所指的是國外的storm牺蹄,其實(shí)都大同小異忘伞,包括Flink,spark streaming,其思想和storm都差不多氓奈。
下圖是我畫的這個(gè)日志監(jiān)控系統(tǒng)的整體架構(gòu)圖匿刮,通過這樣的一個(gè)項(xiàng)目,在實(shí)時(shí)大量數(shù)據(jù)的產(chǎn)生情況下探颈,可以對流式計(jì)算有了一個(gè)初步的理解。
一.日志采集階段
所有的業(yè)務(wù)日志都會(huì)存到kafka中训措,那么它是如何存入到kafka中的呢伪节?我們知道log4j是一個(gè)很好的處理日志的開源框架,它主要靠三個(gè)重要組件绩鸣,Logger怀大,Appender和Layout。Logger是日志記錄器呀闻,負(fù)責(zé)收集處理日志記錄化借。Appender是日志輸出目的地,負(fù)責(zé)日志的輸出捡多,主要是輸出到什么地方蓖康,項(xiàng)目工程中每產(chǎn)生一條日志數(shù)據(jù),就規(guī)定它追加到Appender末尾垒手,之后在將Appender中所有的數(shù)據(jù)自動(dòng)傳輸?shù)絢afka蒜焊,Layout組件是日志格式化,負(fù)責(zé)對輸出的日志格式化(以什么形式展現(xiàn))
二.消息中間件kafka(之后日志數(shù)據(jù)的源頭)
關(guān)于kafka詳細(xì)的介紹科贬,官方文檔寫的很清楚泳梆,在這里就不一一闡述了,重點(diǎn)來說一下它作為一個(gè)分布式流處理平臺(tái)榜掌,如何安置這些流數(shù)據(jù)的优妙。Kafka有一個(gè)核心概念:提供一串流式的記錄— topic ,Kafka中的Topics總是多訂閱者模式憎账,一個(gè)topic可以擁有一個(gè)或者多個(gè)消費(fèi)者來訂閱它的數(shù)據(jù)套硼。對于每一個(gè)topic, Kafka集群都會(huì)維持一個(gè)分區(qū)日志鼠哥,如下所示:
每個(gè)分區(qū)都是有序且順序不可變的記錄集熟菲,并且不斷地追加到結(jié)構(gòu)化的commit log文件。分區(qū)中的每一個(gè)記錄都會(huì)分配一個(gè)id號來表示順序朴恳,我們稱之為offset抄罕,offset用來唯一的標(biāo)識分區(qū)中每一條記錄。
另外一方面kafka之所以大家喜歡用于颖,主要是它的讀寫速度很快呆贿,其實(shí)kafka中的數(shù)據(jù)是保存在磁盤上的,我們知道,磁盤的讀寫性能一般要弱于讀寫內(nèi)存性能做入,然而冒晰,實(shí)際上,快或慢關(guān)鍵在于尋址的方式竟块,磁盤分為順序讀寫與隨機(jī)讀寫壶运,內(nèi)存也一樣分為順序讀寫與隨機(jī)讀寫±嗣兀基于磁盤的隨機(jī)讀寫確實(shí)很慢蒋情,但磁盤的順序讀寫性能卻很高,一般而言要高出磁盤隨機(jī)讀寫三個(gè)數(shù)量級耸携,一些情況下磁盤順序讀寫性能甚至要高于內(nèi)存隨機(jī)讀寫棵癣。磁盤的順序讀寫是磁盤使用模式中最有規(guī)律的,并且操作系統(tǒng)也對這種模式做了大量優(yōu)化夺衍,Kafka就是使用了磁盤順序讀寫來提升的性能狈谊。Kafka的message是不斷追加到本地磁盤文件末尾的,而不是隨機(jī)的寫入沟沙,這使得Kafka寫入吞吐量得到了顯著提升河劝。
此外,在寫入數(shù)據(jù)的時(shí)候矛紫,kafka采用了MMFile(Memory mapped file)文件的形式丧裁,即便是順序?qū)懭胗脖P,硬盤的訪問速度還是追不上內(nèi)存含衔,所以kafka并不是實(shí)時(shí)的寫入硬盤煎娇,充分利用了操作系統(tǒng)現(xiàn)代分頁存儲(chǔ)技術(shù)來利用內(nèi)存提高IO效率。Memory Mapped Files(后面簡稱mmap)也被翻譯成 內(nèi)存映射文件 贪染,在64位操作系統(tǒng)中一般可以表示20G的數(shù)據(jù)文件缓呛,它的工作原理是直接利用操作系統(tǒng)的Page來實(shí)現(xiàn)文件到物理內(nèi)存的直接映射。
完成映射之后你對物理內(nèi)存的操作會(huì)被同步到硬盤上(操作系統(tǒng)在適當(dāng)?shù)臅r(shí)候)杭隙。
通過mmap哟绊,進(jìn)程像讀寫硬盤一樣讀寫內(nèi)存(當(dāng)然是虛擬機(jī)內(nèi)存),也不必關(guān)心內(nèi)存的大小有虛擬內(nèi)存為我們兜底痰憎。
使用這種方式可以獲取很大的I/O提升票髓,省去了用戶空間到內(nèi)核空間復(fù)制的開銷(調(diào)用文件的read會(huì)把數(shù)據(jù)先放到內(nèi)核空間的內(nèi)存中,然后再復(fù)制到用戶空間的內(nèi)存中铣耘。)
但也有一個(gè)很明顯的缺陷——不可靠洽沟,寫到mmap中的數(shù)據(jù)并沒有被真正的寫到硬盤,操作系統(tǒng)會(huì)在程序主動(dòng)調(diào)用flush的時(shí)候才把數(shù)據(jù)真正的寫到硬盤蜗细。
Kafka提供了一個(gè)參數(shù)——producer.type來控制是不是主動(dòng)flush裆操,如果Kafka寫入到mmap之后就立即flush然后再返回Producer叫 同步 (sync)怒详;寫入mmap之后立即返回Producer不調(diào)用flush叫異步 (async)。
在讀取數(shù)據(jù)方面踪区,kafka基于sendFile實(shí)現(xiàn)了少量的copy昆烁,傳統(tǒng)的網(wǎng)絡(luò)文件傳輸過程是
硬盤—>內(nèi)核buf—>用戶buf—>socket相關(guān)緩沖區(qū)—>協(xié)議引擎,可以看到缎岗,經(jīng)歷了4次的copy静尼,而通過sendFile的方式,減少了兩次的copy操作传泊。
Kafka把所有的消息都存放在一個(gè)一個(gè)的文件中茅郎,當(dāng)消費(fèi)者需要數(shù)據(jù)的時(shí)候Kafka直接把文件發(fā)送給消費(fèi)者,配合mmap作為文件讀寫方式或渤,直接把它傳給sendfile。
最后奕扣,kafka也采用了批量壓縮的方式薪鹦。在很多情況下,系統(tǒng)的瓶頸不是CPU或磁盤惯豆,而是網(wǎng)絡(luò)IO池磁,對于需要在廣域網(wǎng)上的數(shù)據(jù)中心之間發(fā)送消息的數(shù)據(jù)流水線尤其如此。進(jìn)行數(shù)據(jù)壓縮會(huì)消耗少量的CPU資源,不過對于kafka而言,網(wǎng)絡(luò)IO更應(yīng)該需要考慮楷兽。
1地熄、如果每個(gè)消息都壓縮,但是壓縮率相對很低芯杀,所以Kafka使用了批量壓縮端考,即將多個(gè)消息一起壓縮而不是單個(gè)消息壓縮
2、Kafka允許使用遞歸的消息集合揭厚,批量的消息可以通過壓縮的形式傳輸并且在日志中也可以保持壓縮格式却特,直到被消費(fèi)者解壓縮
3、Kafka支持多種壓縮協(xié)議筛圆,包括Gzip和Snappy壓縮協(xié)議
Kafka速度的秘訣在于裂明,它把所有的消息都變成一個(gè)批量的文件,并且進(jìn)行合理的批量壓縮太援,減少網(wǎng)絡(luò)IO損耗闽晦,通過mmap提高I/O速度,寫入數(shù)據(jù)的時(shí)候由于單個(gè)Partion是末尾添加所以速度最優(yōu)提岔;讀取數(shù)據(jù)的時(shí)候配合sendfile直接暴力輸出仙蛉。
三.storm處理流數(shù)據(jù)
storm是一個(gè)比較強(qiáng)大的分布式流計(jì)算框架,一些流式數(shù)據(jù)經(jīng)過storm集群預(yù)處理碱蒙,計(jì)算捅儒,存儲(chǔ)合并,得到我們想要的一些指標(biāo)數(shù)據(jù),最后持久化到mysql或者nosql數(shù)據(jù)庫中巧还。
先說幾個(gè)組件的概念鞭莽,有利于理解storm計(jì)算過程。
topology:storm的運(yùn)行是依靠?topology(拓?fù)洌﹣磉M(jìn)行的麸祷,開始時(shí)會(huì)新建一個(gè)?topology(拓?fù)洌┡炫瑂torm結(jié)束時(shí)將會(huì)殺死這個(gè)topology
spout:用來管理storm集群的各種輸入流
bolt:是處理邏輯組件,spout把數(shù)據(jù)傳遞給bolt阶牍,bolt可以進(jìn)行計(jì)算喷面,聚合,過濾走孽,查詢等等惧辈。
每個(gè)topology中會(huì)有spout,bolt等等磕瓷,簡單拓?fù)鋸膕pouts開始盒齿。Spouts將數(shù)據(jù)發(fā)射到一個(gè)或多個(gè)Bolts,并且Bolts的輸出可以發(fā)射到另一個(gè)Bolts作為輸入Spouts和Bolts連接在一起困食,形成拓?fù)浣Y(jié)構(gòu)边翁。實(shí)時(shí)應(yīng)用程序邏輯在Storm拓?fù)渲兄付ā?/p>
有了這幾個(gè)組件的概念之后,我們再來看storm集群的計(jì)算過程硕盹,究竟是如何將日志數(shù)據(jù)流變成我們最后想要的一些指標(biāo)數(shù)據(jù)符匾。首先storm去消費(fèi)kafka中的數(shù)據(jù)的第一步是建立拓?fù)洌負(fù)浣⒁粋€(gè)spout來收集kafka中的數(shù)據(jù)流瘩例,可以把spout理解為收集物料的作用啊胶,之后按照圖中所示,建立第一個(gè)bolt垛贤,它的作用是反序列化创淡,將從spout源中流出的數(shù)據(jù)流進(jìn)行反序列化的處理,當(dāng)?shù)谝粭l反序列化成功的對象過來之后南吮,會(huì)進(jìn)行對象轉(zhuǎn)換并進(jìn)行屬性值的初始化琳彩,然后存起來,當(dāng)下一條消息數(shù)據(jù)過來之后部凑,如果方法名相同露乏,則進(jìn)行相加更新操作,當(dāng)?shù)竭_(dá)一個(gè)時(shí)鐘周期時(shí)涂邀,將收集的所有對象先發(fā)送到下一個(gè)bolt瘟仿,同理,之后的操作會(huì)按照圖中那樣比勉,在各個(gè)bolt之間進(jìn)行處理(其實(shí)經(jīng)歷了反序列化bolt劳较,計(jì)算bolt驹止,合并bolt,存儲(chǔ)bolt)观蜗,在各個(gè)bolt之間的數(shù)據(jù)交接和傳遞過程中臊恋,并不會(huì)像mapreduce那樣,批量的處理和傳遞數(shù)據(jù)墓捻,storm實(shí)時(shí)性還是很強(qiáng)的抖仅,它只是會(huì)在每個(gè)時(shí)鐘周期點(diǎn)將收集的數(shù)據(jù)發(fā)送至下一個(gè)bolt。在最后一個(gè)bolt處理完成后砖第,會(huì)將處理好的數(shù)據(jù)存入Hbase撤卢,一些產(chǎn)生的靜態(tài)數(shù)據(jù)存入Mysql。
四.web展示
最后展示在web上的數(shù)據(jù)主要有兩部分梧兼,一部分是那些核心指標(biāo)放吩,例如異常率,執(zhí)行次數(shù)羽杰,平均執(zhí)行時(shí)間等渡紫,這一部分?jǐn)?shù)據(jù)主要是經(jīng)過storm計(jì)算后,從Hbase里面取忽洛。另一部分?jǐn)?shù)據(jù)是經(jīng)storm計(jì)算后,持久化之后的一些靜態(tài)數(shù)據(jù)环肘,例如產(chǎn)品線欲虚,服務(wù)名,方法名悔雹,報(bào)警配置信息复哆。
展示的話后端用了SpringBoot,持久層Mybatis腌零,前端利用Vue和elementUI來搭建的梯找。