實(shí)時計(jì)算
由于離線計(jì)算無法滿足實(shí)時性要求較高的業(yè)務(wù)膀曾,下面我們來了解一下實(shí)時計(jì)算胜榔。
離線和批量获印、實(shí)時和流式
在聊實(shí)時計(jì)算之前扒俯,先說一下我對離線和批量味抖、實(shí)時和流式的一些看法紧唱。
我們首先來簡單看一下計(jì)算任務(wù)的大致流程:
首先先說下批量計(jì)算和流式計(jì)算:
圖中顯示了一個計(jì)算的基本流程活尊,receiver處負(fù)責(zé)從數(shù)據(jù)源接收數(shù)據(jù),并發(fā)送給下游的task漏益,數(shù)據(jù)由task處理后由sink端輸出蛹锰。
以圖為例,批量和流式處理數(shù)據(jù)粒度不一樣绰疤,批量每次處理一定大小的數(shù)據(jù)塊(輸入一般采用文件系統(tǒng))铜犬,一個task處理完一個數(shù)據(jù)塊之后,才將處理好的中間數(shù)據(jù)發(fā)送給下游轻庆。流式計(jì)算則是以record為單位癣猾,task在處理完一條記錄之后,立馬發(fā)送給下游余爆。
假如我們是對一些固定大小的數(shù)據(jù)做統(tǒng)計(jì)纷宇,那么采用批量和流式效果基本相同,但是流式有一個好處就是可以實(shí)時得到計(jì)算中的結(jié)果蛾方,這對某些應(yīng)用很有幫助像捶,比如每1分鐘統(tǒng)計(jì)一下請求server的request次數(shù)上陕。
那問題來了,既然流式系統(tǒng)也可以做批量系統(tǒng)的事情作岖,而且還提供了更多的功能唆垃,那為什么還需要批量系統(tǒng)呢?因?yàn)樵缙诘牧魇较到y(tǒng)并不成熟痘儡,存在如下問題:
1.流式系統(tǒng)的吞吐不如批量系統(tǒng)
2.流式系統(tǒng)無法提供精準(zhǔn)的計(jì)算
后面的介紹Storm辕万、Spark streaming、Flink主要根據(jù)這兩點(diǎn)來進(jìn)行介紹沉删。
批量和流式的區(qū)別:
1.數(shù)據(jù)處理單位:
批量計(jì)算按數(shù)據(jù)塊來處理數(shù)據(jù)渐尿,每一個task接收一定大小的數(shù)據(jù)塊,比如MR矾瑰,map任務(wù)在處理完一個完整的數(shù)據(jù)塊后(比如128M)砖茸,然后將中間數(shù)據(jù)發(fā)送給reduce任務(wù)。
流式計(jì)算的上游算子處理完一條數(shù)據(jù)后殴穴,會立馬發(fā)送給下游算子凉夯,所以一條數(shù)據(jù)從進(jìn)入流式系統(tǒng)到輸出結(jié)果的時間間隔較短(當(dāng)然有的流式系統(tǒng)為了保證吞吐,也會對數(shù)據(jù)做buffer)采幌。
這樣的結(jié)果就是:批量計(jì)算往往得等任務(wù)全部跑完之后才能得到結(jié)果劲够,而流式計(jì)算則可以實(shí)時獲取最新的計(jì)算結(jié)果。
2.數(shù)據(jù)源:
批量計(jì)算通常處理的是有限數(shù)據(jù)(bound data)休傍,數(shù)據(jù)源一般采用文件系統(tǒng)征绎,而流式計(jì)算通常處理無限數(shù)據(jù)(unbound data),一般采用消息隊(duì)列作為數(shù)據(jù)源磨取。
3.任務(wù)類型:
批量計(jì)算中的每個任務(wù)都是短任務(wù)人柿,任務(wù)在處理完其負(fù)責(zé)的數(shù)據(jù)后關(guān)閉,而流式計(jì)算往往是長任務(wù)忙厌,每個work一直運(yùn)行凫岖,持續(xù)接受數(shù)據(jù)源傳過來的數(shù)據(jù)。
離線=批量逢净?實(shí)時=流式隘截?
習(xí)慣上我們認(rèn)為離線和批量等價;實(shí)時和流式等價汹胃,但其實(shí)這種觀點(diǎn)并不完全正確。
假設(shè)一種情況:當(dāng)我們擁有一個非常強(qiáng)大的硬件系統(tǒng)东臀,可以毫秒級的處理Gb級別的數(shù)據(jù)着饥,那么批量計(jì)算也可以毫秒級得到統(tǒng)計(jì)結(jié)果(當(dāng)然這種情況非常極端,目前不可能)惰赋,那我們還能說它是離線計(jì)算嗎宰掉?
所以說離線和實(shí)時應(yīng)該指的是:數(shù)據(jù)處理的延遲呵哨;批量和流式指的是:數(shù)據(jù)處理的方式。兩者并沒有必然的關(guān)系轨奄。事實(shí)上Spark streaming就是采用小批量(batch)的方式來實(shí)現(xiàn)實(shí)時計(jì)算孟害。
可以參考下面鏈接:https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101。作者是Google實(shí)時計(jì)算的負(fù)責(zé)人挪拟,里面闡述了他對批量和實(shí)時的理解挨务,并且作者認(rèn)為批量計(jì)算只是流式計(jì)算的子集,一個設(shè)計(jì)良好的流式系統(tǒng)完全可以替代批量系統(tǒng)玉组。本人也從中受到了很多啟發(fā)谎柄。
介紹完這些概念后,下面我們就來簡單看看目前流行的實(shí)時計(jì)算框架的實(shí)現(xiàn)和區(qū)別惯雳。
Storm
Storm做為最早的一個實(shí)時計(jì)算框架朝巫,早期應(yīng)用于各大互聯(lián)網(wǎng)公司,這里我們依然使用work count舉例:
spout:負(fù)責(zé)從數(shù)據(jù)源接收數(shù)據(jù)
bolt:負(fù)責(zé)數(shù)據(jù)處理石景,最下游的bolt負(fù)責(zé)數(shù)據(jù)輸出
spout不斷從數(shù)據(jù)源接收數(shù)據(jù)劈猿,然后按一定規(guī)則發(fā)送給下游的bolt進(jìn)行計(jì)算,最下游的bolt將最終結(jié)果輸出到外部系統(tǒng)中(這里假設(shè)輸出到DB)潮孽,這樣我們在DB中就可以看到最新的數(shù)據(jù)統(tǒng)計(jì)結(jié)果揪荣。Storm每一層的算子都可以配置多個,這樣保證的水平擴(kuò)展性恩商。因?yàn)橥幚淼氖莡nbound data变逃,所以storm中的算子都是長任務(wù)。
容災(zāi)
容災(zāi)是所有系統(tǒng)都需要考慮的一個問題怠堪,考慮一下:假如運(yùn)行過程中揽乱,一個算子(bolt)因某種原因掛了,Storm如何恢復(fù)這個任務(wù)呢粟矿?
批處理解決方案就比較簡單凰棉,拿MR舉例,假如一個運(yùn)行中map或reduce失敗陌粹,那么任務(wù)重新提交一遍就ok(只不過重頭計(jì)算又要花費(fèi)大量時間)撒犀,下面我們看看Storm是如何解決的:
storm的spout有一個buffer,會緩存接收到的record掏秩,并且Storm還有一個acker(可以認(rèn)為是一個特殊的bolt任務(wù))或舞,每條record和該record所產(chǎn)生的所有tuple在處理完成后都會向?qū)?yīng)的acker發(fā)送ack消息,當(dāng)acker接收到該record所有的ack消息之后蒙幻,便認(rèn)為該record處理成功映凳,并通知spout從buffer中將該record移除,若receiver沒有在規(guī)定的時間內(nèi)接收到ack邮破,acker則通知spout重放數(shù)據(jù)诈豌。
acker個數(shù)可以由用戶指定仆救,因?yàn)閿?shù)據(jù)量比較大時,一個acker可能處理不過來所有的ack信息矫渔,成為系統(tǒng)瓶頸(如果可以容忍數(shù)據(jù)丟失彤蔽,當(dāng)然也可以關(guān)閉ack機(jī)制,可以顯著提高系統(tǒng)性能)庙洼。并且acker采用了巧妙的機(jī)制顿痪,優(yōu)化了ack機(jī)制的資源占用(有興趣的同學(xué)可以參考官網(wǎng),網(wǎng)上也有很多博客介紹ack具體實(shí)現(xiàn))送膳。
Storm采用ack機(jī)制實(shí)現(xiàn)了數(shù)據(jù)的重放员魏,盡管做了很多優(yōu)化,但是畢竟每條record和它產(chǎn)生的tuple都需要ack叠聋,對吞吐還是有較大的影響撕阎,關(guān)閉ack的話,對于某些不允許丟數(shù)據(jù)的業(yè)務(wù)來說又是不可接受的碌补。
Storm的這種特點(diǎn)會導(dǎo)致大家認(rèn)為:流式計(jì)算的吞吐不如批量計(jì)算虏束。(這點(diǎn)其實(shí)是不對的,只能說Storm的設(shè)計(jì)導(dǎo)致了它的吞吐不如批量計(jì)算厦章,一個設(shè)計(jì)優(yōu)秀的流式系統(tǒng)是有可能擁有和批處理系統(tǒng)一樣的吞吐)
數(shù)據(jù)不重不丟
之前我們提到早期的流式系統(tǒng)無法提供精準(zhǔn)的計(jì)算服務(wù)镇匀,下面我們詳細(xì)了解一下:
sink處的重復(fù)輸出:假如運(yùn)行過程中,boltA數(shù)據(jù)入庫后袜啃,boltB因?yàn)槟撤N原因crash了汗侵,這時候會導(dǎo)致該record重放,boltA中已經(jīng)處理過的數(shù)據(jù)會再次入庫群发,導(dǎo)致部分?jǐn)?shù)據(jù)重復(fù)輸出晰韵。
不僅sink處存在重復(fù)輸出的問題,receiver處也同樣存在這種問題熟妓。(在講解Spark streaming處會詳細(xì)介紹什么情況下receiver會重復(fù)接收數(shù)據(jù))
Storm沒有提供exactly once的功能雪猪,并且開啟ack功能后又會嚴(yán)重影響吞吐,所以會給大家一種印象:流式系統(tǒng)只適合吞吐相對較小的起愈、低延遲不精確的計(jì)算只恨;而精確的計(jì)算則需要由批處理系統(tǒng)來完成,所以出現(xiàn)了Lambda架構(gòu)抬虽,該架構(gòu)由Storm的創(chuàng)始人提出官觅,簡單的理解就是同時運(yùn)行兩個系統(tǒng):一個流式,一個批量阐污,用批量計(jì)算的精確性來彌補(bǔ)流式計(jì)算的不足缰猴,但是這個架構(gòu)存在一個問題就是需要同時維護(hù)兩套系統(tǒng),代價比較大疤剑。
那么有沒有一種架構(gòu)滑绒,可以滿足高吞吐、低延遲的要求隘膘,同時也提供exactly once功能疑故?有的,下面我們來看看Spark streaming弯菊。
Spark streaming
吞吐
Spark streaming采用小批量的方式纵势,提高了吞吐性能:
這里我們簡單展示Spark streaming的運(yùn)行機(jī)制,主要是與Storm做下對比管钳。Spark streaming批量讀取數(shù)據(jù)源中的數(shù)據(jù)钦铁,然后把每個batch轉(zhuǎn)化成內(nèi)部的RDD。Spark streaming以batch為單位進(jìn)行計(jì)算(默認(rèn)1s產(chǎn)生一個batch)才漆,而不是以record為單位牛曹,大大減少了ack所需的開銷,顯著提高了吞吐醇滥。
但也因?yàn)樘幚頂?shù)據(jù)的粒度變大黎比,導(dǎo)致Spark streaming的數(shù)據(jù)延時不如Storm,Spark streaming是秒級返回結(jié)果(與設(shè)置的batch間隔有關(guān))鸳玩,Storm則是毫秒級阅虫。
不重不丟(exactly once)
Spark streaming通過batch的方式提高了吞吐,但是同樣存在上面所說的數(shù)據(jù)丟失和重復(fù)的問題不跟。
在解答這個問題之前颓帝,我們先來了解一下一些概念:
1.at most once:最多消費(fèi)一次,會存在數(shù)據(jù)丟失
2.at least once:最少消費(fèi)一次窝革,保證數(shù)據(jù)不丟购城,但是有可能重復(fù)消費(fèi)
3.exactly once:精確一次,無論何種情況下聊闯,數(shù)據(jù)都只會消費(fèi)一次工猜,這是我們最希望看到的結(jié)果
大部分流式系統(tǒng)都提供了at most once和at least once功能,但不是所有系統(tǒng)都能提供exactly once菱蔬。
我們先看看Spark streaming的at least once是如何實(shí)現(xiàn)的篷帅,Spark streaming的每個batch可以看做是一個Spark任務(wù),receiver會先將數(shù)據(jù)寫入WAL拴泌,保證receiver宕機(jī)時魏身,從數(shù)據(jù)源獲取的數(shù)據(jù)能夠從日志中恢復(fù)(注意這里,早期的Spark streaming的receiver存在重復(fù)接收數(shù)據(jù)的情況)蚪腐,并且依賴RDD實(shí)現(xiàn)內(nèi)部的exactly once(可以簡單的理解采用批量計(jì)算的方式來實(shí)現(xiàn))箭昵。RDD:Resilient Distributed Dataset彈性分布式數(shù)據(jù)集,Spark保存著RDD之間的依賴關(guān)系回季,保證RDD計(jì)算失敗時家制,可以通過上游RDD進(jìn)行重新計(jì)算(RDD如何實(shí)現(xiàn)容錯這里就不解釋了正林,可以自行查資料)。
上面簡單解釋了Spark streaming依賴源數(shù)據(jù)寫WAL和自身RDD機(jī)制提供了容災(zāi)功能颤殴,保證at least once觅廓,但是依然無法保證exactly once,在回答這個問題前涵但,我們再來看一下杈绸,什么情況Spark streaming的數(shù)據(jù)會重復(fù)計(jì)算。
這里我們主要關(guān)注圖中的3個紅框:
Spark streaming的RDD機(jī)制只能保證內(nèi)部計(jì)算exactly once(圖中的1)矮瘟,但這是不夠的瞳脓,回想一下剛才Storm的例子,假如某個batch中澈侠,sink處一部分?jǐn)?shù)據(jù)已經(jīng)入庫劫侧,這時候某個sink節(jié)點(diǎn)宕機(jī),導(dǎo)致該節(jié)點(diǎn)處理的數(shù)據(jù)重復(fù)輸出(圖中的3埋涧,Storm處已經(jīng)解釋過了)板辽。還有另一種情況就是receiver處重復(fù)接收數(shù)據(jù)(圖中的2),我們看一下receiver重復(fù)接收數(shù)據(jù)的情況:
假如receiverA目前從kafka讀到pos=100的記錄棘催,并且已經(jīng)持久化到HDFS劲弦,但是由于網(wǎng)絡(luò)延遲沒有及時更新pos,此時receiverA宕機(jī)了醇坝,receiverB接管A的數(shù)據(jù)邑跪,并且后續(xù)的任務(wù)還會從pos=100處重新讀取,導(dǎo)致重復(fù)消費(fèi)呼猪。造成這種情況的主要原因就是:receiver處數(shù)據(jù)消費(fèi)和Kafka中position的更新沒有做到原子性画畅。
根據(jù)上面的討論,可以得出:一個流式系統(tǒng)如果要做到exactly once宋距,必須滿足3點(diǎn):
1.receiver處保證exactly once
2.流式系統(tǒng)自身保證exactly once
3.sink處保證exactly once
這里數(shù)據(jù)源采用Kafka舉例是因?yàn)镵afka作為目前主流的分布式消息隊(duì)列轴踱,比較有代表性。Kafka consumer的position可以保存在ZK或者Kafka中谚赎,也可以由consumer自己來保存淫僻。前者的話就可能存在數(shù)據(jù)消費(fèi)和position更新不一致的問題(因?yàn)闊o法保證原子性,也是之前Spark streaming采用的方式)壶唤,而采用后者的話雳灵,consumer可以采用事務(wù)更新的方式(寫本地或者采用事務(wù)的方式寫數(shù)據(jù)庫),保證數(shù)據(jù)消費(fèi)和position更新的原子性闸盔,從而實(shí)現(xiàn)exactly once(參考)悯辙。
Spark streaming 實(shí)現(xiàn) exactly once
Spark streaming1.3版本新添加了Kafka Direct API來實(shí)現(xiàn)數(shù)據(jù)接收的exactly once,本質(zhì)上就是上面提到的后者,Spark streaming自己維護(hù)position躲撰,streaming的worker直接從Kafka讀取數(shù)據(jù)针贬,position由Spark streaming管理,不再依賴ZK保存拢蛋,同時保證數(shù)據(jù)消費(fèi)和更新position的原子性坚踩,從而實(shí)現(xiàn)exactly once。
并且新的方式已經(jīng)不再需要receiver持久化數(shù)據(jù)瓤狐,因?yàn)镵afka本身就支持?jǐn)?shù)據(jù)持久化,可以避免receiver處持久化數(shù)據(jù)的開銷批幌,實(shí)現(xiàn)exactly once的同時也提高了性能础锐。
而sink處的exactly once的實(shí)現(xiàn)則視外部系統(tǒng)而定,比如文件系統(tǒng)本身就支持冪等(同一個操作執(zhí)行多次荧缘,不會改變之前的結(jié)果)皆警,同時Spark streaming也提供了api,用戶可以自己實(shí)現(xiàn)sink處的事務(wù)更新截粗,receiver信姓、sink和Spark streaming三者結(jié)合起來才能實(shí)現(xiàn)了真正的exactly once。
Storm trident本質(zhì)上也是采用了小批量的方式绸罗,并且也實(shí)現(xiàn)了exactly once語義意推,這里就不做過多討論菊值。
直到這里,我們了解到Spark streaming擁有較好的吞吐和exactly once語義儿子,解決了Storm一些不足柔逼,是不是只有采用類似Spark streaming這種小批量(micro-batch)的方式才能實(shí)現(xiàn)這些功能?答案是:NO儡毕。下面我們來看看Flink雷恃。
Flink
Flink在數(shù)據(jù)處理的方式上和Storm類似倒槐,并沒有采用小批量附井,是一個真正的流式系統(tǒng)永毅。它不僅擁有了不弱于Spark streaming的吞吐沼死,并且提供了exactly once語義。既然Flink也是逐條處理記錄,那么它是怎么做到的呢秀姐?跟上我的腳步...(下面內(nèi)容大部分參考官網(wǎng),撿重點(diǎn)的翻譯锥咸,想起來一個段子:如何快速成為業(yè)界大牛?答:翻譯英文文檔。速缨。hahaha旬牲,開個玩笑^ ^)
簡單來說仿粹,F(xiàn)link采用輕量級分布式快照實(shí)現(xiàn)容錯原茅,大致流程是:Flink不斷的對整個系統(tǒng)做snapshot,snapshot數(shù)據(jù)可以放在master上或外部系統(tǒng)(如HDFS)擂橘,假如發(fā)生故障時,F(xiàn)link停止整個數(shù)據(jù)流通贞,并選出最近完成的snapshot朗若,將整個數(shù)據(jù)流恢復(fù)到該snapshot那個時間點(diǎn)昌罩,snapshot本身比較輕量,而且用戶可以自行配置snapshot的間隔,snapshot的性能開銷對系統(tǒng)的影響很小(官方測試snapshot開啟前后的性能差距不大)。
barrier是分布式snapshot實(shí)現(xiàn)中一個非常核心的元素董饰,barrier和records一起在流式系統(tǒng)中傳輸啄栓,barrier是當(dāng)前snapshot和下一個snapshot的分界點(diǎn),它攜帶了當(dāng)前snapshot的id也祠,假設(shè)目前在做snapshot N昙楚,算子在發(fā)送barrier N之前,都會對當(dāng)前的狀態(tài)做checkpoint(checkpoint數(shù)據(jù)可以保存在外部系統(tǒng)中诈嘿,如HDFS)堪旧,checkpoint只包含了barrier N之前的數(shù)據(jù)狀態(tài),不會涉及barrier N之后的數(shù)據(jù)奖亚。
因?yàn)樗阕雍芏嗲闆r下需要接收多個算子的數(shù)據(jù)(shuffle操作)淳梦,所以只有當(dāng)所有上游的發(fā)送的barrier N都到達(dá)之后,算子才會將barrier N發(fā)送給下游(所有的下游)昔字。當(dāng)所有的sink算子都接收到barrier N之后爆袍,才會認(rèn)為該snapshot N成功完成。
為了保證一致性,需要遵守以下幾個原則:
1.一旦算子接收到某一個上游算子的barrier之后螃宙,它不能再處理該上游后面的數(shù)據(jù)蛮瞄,只有當(dāng)它所有上游算子的barrier都到達(dá),并將barrier發(fā)送給下游之后谆扎,才能繼續(xù)處理數(shù)據(jù)挂捅,否則的話會造成snapshot N和N+1的數(shù)據(jù)重疊。
2.某個上游算子的barrier到達(dá)之后堂湖,該上游算子后續(xù)的數(shù)據(jù)將會被緩存在input buffer中闲先。
3.一旦所有上游的算子的barrier都到達(dá)之后,該算子將數(shù)據(jù)和barrier發(fā)送給下游无蜂。
4.發(fā)送成功之后伺糠,該算子繼續(xù)處理input buffer中的數(shù)據(jù),并繼續(xù)接收處理上游算子發(fā)送過來的數(shù)據(jù)斥季。(有點(diǎn)啰嗦把低啊)
下面我們來看一個完整的snapshot流程圖:
圖片有點(diǎn)不清晰,可以自己去官網(wǎng)看酣倾,筆者比較蠢舵揭,怎么都截不下來高清的
圖中的Master保存了snapshot的狀態(tài),假設(shè)數(shù)據(jù)還是從Kafka中獲取躁锡,首先receiver算子會先將當(dāng)前的position發(fā)送給master午绳,記錄在snapshot中,并同時向下游發(fā)送barrier映之,下游的算子接收到barrier后拦焚,發(fā)起checkpoint操作,將當(dāng)前的狀態(tài)記錄在外部系統(tǒng)中杠输,并更新Master中snapshot狀態(tài)赎败,最后當(dāng)所有的sink算子都接收到barrier之后,更新snapshot中的狀態(tài)蠢甲,此時認(rèn)為該snapshot完成螟够。
通過這種輕量級的分布式snapshot方式,F(xiàn)link實(shí)現(xiàn)了exactly once峡钓,同時Flink也支持at least once妓笙,也就是算子不阻塞barrier已經(jīng)到達(dá)的上游算子的數(shù)據(jù)(多個上游算子的情況),這樣可以降低延遲能岩,但是不保證exactly once寞宫。
從圖中我們可以看出Kafka position也是由Flink自己維護(hù)的,所以能夠保證receiver處的exactly once拉鹃,sink處也同樣存在Spark streaming一樣的問題辈赋,exactly once依賴外部系統(tǒng)或需要用戶自己實(shí)現(xiàn)鲫忍。Flink官網(wǎng)給出了目前支持的Data Sources和Sinks以及容錯的粒度。
其中sink處采用Kafka的話不支持exactly once钥屈,個人猜想是不是因?yàn)樵缙诘腒afka producer沒有支持exactly once語義悟民,而導(dǎo)致Flink無法支持。Kafka0.11版本中添加了producer exactly once的支持篷就,是否后續(xù)能夠添加進(jìn)來射亏?
講到這里,我們可以了解到:
1.流式系統(tǒng)并不一定就是吞吐差的代名詞
2.流式系統(tǒng)也可以做到exactly once
就如Google流式系統(tǒng)負(fù)責(zé)人Tyler Akidau所說:一個設(shè)計(jì)良好的流式系統(tǒng)是能夠在吞吐完全媲美批量系統(tǒng)竭业,并且提供精準(zhǔn)的實(shí)時服務(wù)智润。(那是不是以后可以完全用流式系統(tǒng)取代批量系統(tǒng)?)
window和event time
Flink相比Spark streaming不僅提供了更低的延遲未辆,而且Flink還對window和event time提供了更好的支持窟绷。window和event time又是什么呢?
window
現(xiàn)實(shí)生活中咐柜,大部分?jǐn)?shù)據(jù)源其實(shí)是unbound data兼蜈,沒有邊界,我們沒有辦法得到一個最終的統(tǒng)計(jì)結(jié)果拙友,很多情況下我們會對固定時間間隔的數(shù)據(jù)進(jìn)行統(tǒng)計(jì)饭尝,比如每5s統(tǒng)計(jì)一下服務(wù)器的qps,window機(jī)制能夠幫我們很好的完成這項(xiàng)需求献宫。
如圖(標(biāo)號代表事件發(fā)生的時間),流式系統(tǒng)會每隔5s創(chuàng)建一個window实撒,將該時間段的數(shù)據(jù)放入buffer姊途,累加后輸出結(jié)果。圖中0-5s產(chǎn)生的數(shù)據(jù)放在第一個window中(3s處有兩條數(shù)據(jù))知态,累加后輸出count=6捷兰。
window類型也有很多種,上圖是一個Tumbling Windows的例子负敏,另外還有Sliding Windows和session window贡茅,具體區(qū)別讀者可以自行查資料。
上圖是一個比較理想的示例圖其做,理想很豐滿顶考,現(xiàn)實(shí)很骨感,事情往往不盡如人意(情不自禁的都想唱起來了:人生已經(jīng)如此的艱難妖泄,有些事情就不要ao......流式系統(tǒng)的破事怎么這么多>匝亍!)蹈胡,直接按接收時間來劃分window可能會存在誤差:
假設(shè)由于網(wǎng)絡(luò)延遲渊季,應(yīng)該屬于第一個窗口的數(shù)據(jù)3延遲到達(dá)朋蔫,被分到了第二個窗口,這時候計(jì)算結(jié)果并不準(zhǔn)確却汉。怎么辦呢驯妄?
event time和process time:
假設(shè)一個流式系統(tǒng)目前正在接收并處理用戶手機(jī)的日志,但是由于網(wǎng)絡(luò)延遲合砂,或者用戶手機(jī)離線青扔,導(dǎo)致日志沒有及時發(fā)送到流式系統(tǒng),流式系統(tǒng)觀察到數(shù)據(jù)的時間和數(shù)據(jù)真正產(chǎn)生的時間可能存在偏差既穆,我們把數(shù)據(jù)真正產(chǎn)生的時間叫做:event time赎懦,把流式系統(tǒng)處理該數(shù)據(jù)的時間叫做:process time。
event time和process time往往會存在延遲幻工,這種不一致會導(dǎo)致數(shù)據(jù)亂序励两,如圖所示:藍(lán)色事件晚于黃色事件發(fā)生,但是事件的處理卻先于黃色事件囊颅。
早期的流式系統(tǒng)并沒有區(qū)分process time和event time当悔,往往將process time等同于event time。針對這一問題踢代,一個很直觀的解決方案就是:讓數(shù)據(jù)自身攜帶timestamp盲憎,該timestamp記錄該數(shù)據(jù)產(chǎn)生的時間,即為event time胳挎,流式系統(tǒng)按數(shù)據(jù)的event time來將數(shù)據(jù)分配到對應(yīng)的窗口饼疙,而不是按處理數(shù)據(jù)的時間。
window需要知道該窗口的數(shù)據(jù)都已經(jīng)全部到達(dá)慕爬,然后觸發(fā)計(jì)算邏輯窑眯,如何window判斷時間T之前的數(shù)據(jù)是否都已經(jīng)到達(dá)呢?
watermark
那就是引入watermark機(jī)制医窿,watermark同樣也攜帶一個時間戳磅甩,當(dāng)算子接收到watermark T后,就代表時間T之前的數(shù)據(jù)已經(jīng)接收完畢姥卢,不會再有小于時間T的數(shù)據(jù)卷要。
如圖:W(17)到達(dá)后,表示后續(xù)數(shù)據(jù)的時間戳不會小于17独榴。那可能有人會問了:那就是有一部分小于17的數(shù)據(jù)他喵的就是比w(17)還晚到了怎么辦僧叉?
watermark還會配合一個allow lateness參數(shù),window接收到watermark后棺榔,再等待一段時間才會關(guān)閉窗口彪标,如果這段時間有些數(shù)據(jù)依然沒有發(fā)送過來,那就只能忽略它們了(window的內(nèi)心os:我也嘗試過等待掷豺,但我還有更重要的事情要做)捞烟,而且考慮到流式系統(tǒng)的實(shí)時性薄声,假如可接受的時間內(nèi),數(shù)據(jù)沒有傳輸過來题画,那就算等到它過來再計(jì)算默辨,從實(shí)時性這個角度來說,這時計(jì)算的結(jié)果也有可能也已經(jīng)沒有意義了苍息。
Flink對window和watermark都提供了較好的支持缩幸,Spark streaming從2.0中也開始引入watermark功能,但是支持的功能有限竞思,并且真正的流式可以更優(yōu)雅表谊、簡單的實(shí)現(xiàn)window和watermark,從這個角度來看盖喷,F(xiàn)link是優(yōu)于Spark streaming的爆办。
總結(jié):
了解了Storm、Spark streaming课梳、Flink各自的特點(diǎn)后距辆,我們知道Storm提供了低延遲的計(jì)算,但是吞吐較低暮刃,并且無法保證exactly once(Storm trident采用batch的方式改善了這兩點(diǎn))跨算,Spark streaming通過小批量的方式保證了吞吐的情況下,同時提供了exactly once語義椭懊,但是實(shí)時性不如Storm诸蚕,而且由于采用micro-batch的方式,對window和event time的支持比較有限(Spark streaming2.0中引入了window和event time氧猬,還在起步階段)背犯。Flink采用分布式快照的方式實(shí)現(xiàn)了一個高吞吐、低延遲狂窑、支持exactly once的流式系統(tǒng),流式處理的方式也能更優(yōu)雅的支持window和event time桑腮。
當(dāng)然也不是說Flink一定就比Storm泉哈、Spark streaming好,沒有最好的框架破讨,只有最合適的框架丛晦,根據(jù)自身的業(yè)務(wù)、公司的技術(shù)儲備選擇最合適的框架才是正確的選擇提陶。
end
作者:奔跑的番茄醬
鏈接:http://www.reibang.com/p/16323566f3c6
來源:簡書
著作權(quán)歸作者所有烫沙。商業(yè)轉(zhuǎn)載請聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請注明出處隙笆。