Spark Streaming 流式計算實戰(zhàn)

這篇文章由一次平安夜的微信分享整理而來。在Stuq 做的分享,原文內(nèi)容

業(yè)務(wù)場景

這次分享會比較實戰(zhàn)些扒吁。具體業(yè)務(wù)場景描述:

我們每分鐘會有幾百萬條的日志進(jìn)入系統(tǒng),我們希望根據(jù)日志提取出時間以及用戶名稱室囊,然后根據(jù)這兩個信息形成

  • userName/year/month/day/hh/normal
  • userName/year/month/day/hh/delay

路徑,存儲到HDFS中雕崩。如果我們發(fā)現(xiàn)日志產(chǎn)生的時間和到達(dá)的時間相差超過的一定的閾值,那么會放到 delay 目錄融撞,否則放在正常的 normal 目錄盼铁。

Spark Streaming 與 Storm 適用場景分析

為什么這里不使用 Storm呢? 我們初期確實想過使用 Storm 去實現(xiàn),然而使用 Storm 寫數(shù)據(jù)到HDFS比較麻煩:

* Storm 需要持有大量的 HDFS 文件句柄尝偎。需要落到同一個文件里的記錄是不確定什么時候會來的饶火,你不能寫一條就關(guān)掉,所以需要一直持有致扯。
* 需要使用HDFS 的寫文件的 append 模式肤寝,不斷追加記錄。

大量持有文件句柄以及在什么時候釋放這些文件句柄都是一件很困難的事情急前。另外使用 HDFS 的追加內(nèi)容模式也會有些問題醒陆。

后續(xù)我們就調(diào)研 Spark Streaming 。 Spark Streaming 有個好處裆针,我可以攢個一分鐘處理一次即可刨摩。這就意味著寺晌,我們可以隔一分鐘(你當(dāng)然也可以設(shè)置成五分鐘,十分鐘)批量寫一次集群澡刹,HDFS 對這種形態(tài)的文件存儲還是非常友好的呻征。這樣就很輕易的解決了 Storm 遇到的兩個問題。

實時性也能得到保證罢浇,譬如我的 batch interval 設(shè)置為 一分鐘 那么我們就能保證一分鐘左右的延遲 陆赋,事實上我們的業(yè)務(wù)場景是可以容忍半小時左右的。

當(dāng)然嚷闭,Spark 處理完數(shù)據(jù)后攒岛,如何落到集群是比較麻煩的一件事情,不同的記錄是要寫到不同的文件里面去的胞锰,沒辦法簡單的 saveAsTextFile 就搞定灾锯。這個我們通過自定義 Partitioner 來解決,第三個環(huán)節(jié)會告訴大家具體怎么做嗅榕。

上面大家其實可以看到 Spark Streaming 和 Storm 都作為流式處理的一個解決方案顺饮,但是在不同的場景下,其實有各自適合的時候凌那。

Spark Streaming 與 Kafka 集成方案選型

我們的數(shù)據(jù)來源是Kafka ,我們之前也有應(yīng)用來源于 HDFS文件系統(tǒng)監(jiān)控的,不過建議都盡量對接 Kafka 兼雄。

Spark Streaming 對接Kafka 做數(shù)據(jù)接受的方案有兩種:
*Receiver-based Approach
*Direct Approach (No Receivers)

兩個方案具體優(yōu)劣我專門寫了文章分析,大家晚點(diǎn)可以看看這個鏈接和 Spark Streaming 相關(guān)的文章帽蝶。

我的技術(shù)博文

我這里簡單描述下:
*Receiver-based Approach 內(nèi)存問題比較嚴(yán)重赦肋,因為她接受數(shù)據(jù)和處理數(shù)據(jù)是分開的。如果處理慢了嘲碱,它還是不斷的接受數(shù)據(jù)金砍。容易把負(fù)責(zé)接受的節(jié)點(diǎn)給搞掛了。
* Direct Approach 是直接把 Kafka 的 partition 映射成 RDD 里的 partition 麦锯。 所以數(shù)據(jù)還是在 kafka 恕稠。只有在算的時候才會從 Kafka 里拿,不存在內(nèi)存問題扶欣,速度也快鹅巍。

所以建議都是用 Direct Approach 。具體調(diào)用方式是這樣:


還是很簡單的料祠,之后就可以像正常的 RDD 一樣做處理了骆捧。

自定義 Partitioner 實現(xiàn)日志文件快速存儲到 HDFS

經(jīng)過處理完成后 ,我們拿到了logs 對象 髓绽。

到這一步位置敛苇,日志的每條記錄其實是一個 tuple(path,line) 也就是每一條記錄都會被標(biāo)記上一個路徑。那么現(xiàn)在要根據(jù)路徑顺呕,把每條記錄都寫到對應(yīng)的目錄去該怎么做呢枫攀?

一開始想到的做法是這樣:

首先收集到所有的路徑括饶。接著 for 循環(huán) paths ,然后過濾再進(jìn)行存儲,類似這樣:

這里我們一般會把 rdd 給 cache 住来涨,這樣每次都直接到內(nèi)存中過濾就行了图焰。但如果 path 成百上千個呢? 而且數(shù)據(jù)量一分鐘至少幾百萬蹦掐,save 到磁盤也是需要時間的技羔。所以這種方案肯定是不可行的。

我當(dāng)時還把 paths 循環(huán)給并行化了卧抗,然而當(dāng)前情況是 CPU 處理慢了藤滥,所以有改善,但是仍然達(dá)不到要求颗味。

這個時候你可能會想超陆,要是我能把每個路徑的數(shù)據(jù)都事先收集起來牺弹,得到幾個大的集合浦马,然后把這些集合并行的寫入到 HDFS 上就好了。事實上张漂,后面我實施的方案也確實是這樣的晶默。所謂集合的概念,其實就是 Partition 的概念航攒。而且這在Spark 中也是易于實現(xiàn)的磺陡,而實現(xiàn)的方式就是利用自定義 Partioner 。具體的方式如下:

通過上面的代碼漠畜,我們就得到了路徑和 partiton id 的對應(yīng)關(guān)系币他。接著遍歷 partition 就行了。對應(yīng)的 a 是分區(qū)號憔狞,b 則是分區(qū)的數(shù)據(jù)迭代器蝴悉。接著做對應(yīng)的寫入操作就行。這些分區(qū)寫入都是在各個 Executor 上執(zhí)行的瘾敢,并不是在 Driver 端拍冠,所以足夠快。

我簡單解釋下代碼 簇抵,首先我把收集到的路徑 zipWithIndex 這樣就把路徑和數(shù)字一一對應(yīng)了 庆杜;接著我新建了一個匿名類 實現(xiàn)了 Partitioner 。numPartitions 顯然就是我們的路徑集合的大小碟摆,遇到一個 key (其實就是個路徑)時晃财,則調(diào)用路徑和數(shù)字的映射關(guān)系 ,然后就把所有的數(shù)據(jù)根據(jù)路徑 hash 到不同的 partition 了 典蜕。接著遍歷 partition 就行了断盛,對應(yīng)的 a 是分區(qū)號雏逾,b 則是分區(qū)的數(shù)據(jù)迭代器。接著做對應(yīng)的寫入操作就行郑临。這些分區(qū)寫入都是在各個 Executor 上執(zhí)行的栖博,并不是在 Driver 端,所以足夠快厢洞。我們在測試集群上五分鐘大概 1000-2000w 數(shù)據(jù)仇让,90顆核,180G 內(nèi)存躺翻,平均處理時間大概是2分鐘左右丧叽。內(nèi)存可以再降降 我估計 100G 足夠了 。

在演示場景中公你,Spark Streaming 如何保證數(shù)據(jù)的完整性踊淳,不丟,不重

雖然 Spark Streaming 是作為一個24 * 7 不間斷運(yùn)行的程序來設(shè)計的陕靠,但是程序都會 crash ,那如果 crash 了迂尝,會不會導(dǎo)致數(shù)據(jù)丟失?會不會啟動后重復(fù)消費(fèi)剪芥?

關(guān)于這個垄开,我也有專門的文章闡述(http://www.reibang.com/p/885505daab29 ),

我這里直接給出結(jié)論:

* 使用 Direct Approach 模式
* 啟用 checkPoint 機(jī)制

做到上面兩步,就可以保證數(shù)據(jù)至少被消費(fèi)一次税肪。

那如何保證不重復(fù)消費(fèi)呢溉躲?

這個需要業(yè)務(wù)自己來保證。簡單來說益兄,業(yè)務(wù)有兩種:

* 冪等的
* 自己保證事務(wù)

所謂冪等操作就是重復(fù)執(zhí)行不會產(chǎn)生問題锻梳,如果是這種場景下,你不需要額外做任何工作净捅。但如果你的應(yīng)用場景是不允許數(shù)據(jù)被重復(fù)執(zhí)行的疑枯,那只能通過業(yè)務(wù)自身的邏輯代碼來解決了。

以當(dāng)前場景為例灸叼,就是典型的冪等 神汹,因為可以做寫覆蓋 ,

具體代碼如上 古今,那如何保證寫覆蓋呢屁魏?

文件名我采用了 job batch time 和 partition 的 id 作為名稱。這樣捉腥,如果假設(shè)系統(tǒng)從上一次失敗的 job 重新跑的時候氓拼,相同的內(nèi)容會被覆蓋寫,所以就不會存在重復(fù)的問題。

回顧

我們每分鐘會有幾百萬條的日志進(jìn)入系統(tǒng)桃漾,我們希望根據(jù)日志提取出時間以及用戶名稱坏匪,然后根據(jù)這兩個信息形成

  • userName/year/month/day/hh/normal
  • userName/year/month/day/hh/delay

路徑,存儲到HDFS中。如果我們發(fā)現(xiàn)日志產(chǎn)生的時間和到達(dá)的時間相差超過的一定的閾值撬统,那么會放到 delay 目錄适滓,否則放在正常的 normal 目錄。

我們作了四個方面的分析:

  1. Spark Streaming 與 Storm 適用場景分析 恋追;
  2. Spark Streaming 與 Kafka 集成方案選型凭迹,我們推薦Direct Approach 方案 ;
  3. 自定義 Partitioner 實現(xiàn)日志文件快速存儲到HDFS 苦囱;
  4. Spark Streaming 如何保證數(shù)據(jù)的完整性嗅绸,不丟,不重 撕彤。

好的 感謝大家 圣誕快樂 _

Q&A

Q1. spark streaming 可以直接在后面連上 elasticsearch 么鱼鸠?

A1. 可以的。透露下羹铅,我馬上也要做類似的實踐蚀狰。

Q2. 公司選用 storm 是由于它可以針對每條日志只做一次處理,spark streaming 可以做到么睦裳?

A2. spark streaming 是按時間周期的造锅, 需要攢一段時間,再一次性對獲得的所有數(shù)據(jù)做處理

Q3. 什么是文件句柄廉邑?

A3. HDFS 寫入 你需要持有對應(yīng)的文件的 client 。不可能來一條數(shù)據(jù)倒谷,就重新常見一個鏈接蛛蒙,然后用完就關(guān)掉。

Q4. Spark 分析流數(shù)據(jù)渤愁,分析好的數(shù)據(jù)怎么存到 mysql 比較好牵祟?

A4. 我沒有這個實踐過存儲到 MySQL。一般數(shù)據(jù)量比較大抖格,所以對接的會是 Reids/HBase/HDFS诺苹。

Q5. 有沒有嘗試過將數(shù)據(jù)寫入 hive?

A5. 沒有雹拄。但沒有問題的收奔。而且 Spark Streaming 里也可以使用 Spark SQL 。我不知道這會不會有幫助滓玖。

Q6. 冪等是什么坪哄?

A6. 就是反復(fù)操作不會有副作用。

Q7. 可不可以分享一下 spark 完整的應(yīng)用場景?

A7. 這個有點(diǎn)太大翩肌。 目前 spark 覆蓋了離線計算模暗,數(shù)據(jù)分析,機(jī)器學(xué)習(xí)念祭,圖計算兑宇,流式計算等多個領(lǐng)域,目標(biāo)也是一個通用的數(shù)據(jù)平臺粱坤,所以一般你想到的都能用 spark 解決顾孽。

Q8. 如何理解日志產(chǎn)生時間和到達(dá)時間相差超過一定的閾值?

A8. 每條日志都會帶上自己產(chǎn)生的時間比规。同時若厚,如果這條日志到我們的系統(tǒng)太晚了,我們就認(rèn)為這屬于延時日志蜒什。

Q9. 目前這套體系穩(wěn)定性如何测秸?會不會有經(jīng)常d節(jié)點(diǎn)的情況?

A9. 穩(wěn)定性確實有待商榷 建議小范圍嘗試灾常。

Q10. Spark Streaming 內(nèi)部是如何設(shè)計并解決 storm 存在的兩個問題的霎冯?老師能分析一下細(xì)節(jié)嗎?

A10. 這和 Spark Streaming 的設(shè)計是相關(guān)的钞瀑。微批處理模式使得我們可以一個周期打開所有文件句柄沈撞,然后直接寫入幾千萬條數(shù)據(jù),然后關(guān)閉雕什。第二個是使用 partition 并行加快寫入速度缠俺。

Q11. 如何應(yīng)對網(wǎng)絡(luò)抖動導(dǎo)致阻塞?

A11. Spark 本身有重試機(jī)制,還有各種超時機(jī)制贷岸。

Q12. 怎樣保證消息的及時性壹士?

A12. 依賴于數(shù)據(jù)源,kafka,Spark Streaming 是否處理能力充足偿警,沒有 delay . 所有環(huán)節(jié)都會影響消息的及時性躏救。

Q13. 實際運(yùn)用中,分析完的數(shù)據(jù)螟蒸,本身有很大的結(jié)構(gòu)關(guān)系盒使,有時又需要對數(shù)據(jù)二次補(bǔ)充,處理完的數(shù)據(jù)量不大七嫌,該選哪種存儲方式少办?

A13. 能用分布式存儲的就用分布式存儲〕可以不做更新的凡泣,盡量不做更新枉疼。我一般推薦對接到 HBase 。

Q14. Streaming 字面是流的意思鞋拟,倒是課程中提到對日志有延遲的考慮骂维,是 Spark Streaming 是自定一個周期,處理周期到達(dá)的數(shù)據(jù)集合贺纲,通俗講感覺像批處理航闺,不是每條記錄不一定要有時間戳?

A14. 你理解對了猴誊。每條記錄沒有時間戳潦刃。如果有,也是日志自己帶的懈叹。Spark Streaming 并不會給每條記錄帶上時間乖杠。

Q16. storm 避免重復(fù)是依賴 zookeeper,Spark Streaming 靠什么記錄處理到哪行呢澄成?

A16. 通過 checkpoint 機(jī)制胧洒,自己維護(hù)了 zookeeper 的偏移量。

Q17. 請問一下 Spark Streaming 處理日志數(shù)據(jù)的壓測結(jié)果如何呢墨状?

Q17. 剛剛說了卫漫,在我們的測試集群里, 1000-2000w 條記錄肾砂,平均處理時間大約2分鐘列赎,90顆核,180G 內(nèi)存镐确。沒有任何調(diào)優(yōu)參數(shù)包吝。理論內(nèi)存可以繼續(xù)降低,,因為不 cache 數(shù)據(jù) 辫塌。

Q18. AMQ 與他們之間區(qū)別和聯(lián)系漏策?

A18. AMQ 也是消息隊列? Spark Streaming 支持相當(dāng)多的消息隊列臼氨。

Q19. 國內(nèi) spark 集群部署在哪些云上?

A19. 沒有用過云芭届。

Q21. zookeeper 目前 hbase 都不想依賴它了储矩,因為會導(dǎo)致系統(tǒng)的不穩(wěn)定,請問老師怎么看褂乍?

A21. 還好吧持隧,產(chǎn)生問題主要是 client 太多。比如 hbase 依賴 zookeeper逃片,所有使用 hbase 的屡拨,都需要先和 zookeeper 建立連接,這對 zookeeper 產(chǎn)生較大的壓力。其他的系統(tǒng)也類似呀狼。如果共享 zookeeper 集群裂允,那么它的連接數(shù)會成為一個瓶頸。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末哥艇,一起剝皮案震驚了整個濱河市绝编,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌貌踏,老刑警劉巖十饥,帶你破解...
    沈念sama閱讀 206,311評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異祖乳,居然都是意外死亡逗堵,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,339評論 2 382
  • 文/潘曉璐 我一進(jìn)店門眷昆,熙熙樓的掌柜王于貴愁眉苦臉地迎上來蜒秤,“玉大人,你說我怎么就攤上這事隙赁】巡兀” “怎么了?”我有些...
    開封第一講書人閱讀 152,671評論 0 342
  • 文/不壞的土叔 我叫張陵伞访,是天一觀的道長掂骏。 經(jīng)常有香客問我,道長厚掷,這世上最難降的妖魔是什么弟灼? 我笑而不...
    開封第一講書人閱讀 55,252評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮冒黑,結(jié)果婚禮上田绑,老公的妹妹穿的比我還像新娘。我一直安慰自己抡爹,他們只是感情好掩驱,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,253評論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著冬竟,像睡著了一般欧穴。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上泵殴,一...
    開封第一講書人閱讀 49,031評論 1 285
  • 那天涮帘,我揣著相機(jī)與錄音,去河邊找鬼笑诅。 笑死调缨,一個胖子當(dāng)著我的面吹牛疮鲫,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播弦叶,決...
    沈念sama閱讀 38,340評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼俊犯,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了湾蔓?” 一聲冷哼從身側(cè)響起瘫析,我...
    開封第一講書人閱讀 36,973評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎默责,沒想到半個月后贬循,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,466評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡桃序,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,937評論 2 323
  • 正文 我和宋清朗相戀三年杖虾,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片媒熊。...
    茶點(diǎn)故事閱讀 38,039評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡奇适,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出芦鳍,到底是詐尸還是另有隱情嚷往,我是刑警寧澤,帶...
    沈念sama閱讀 33,701評論 4 323
  • 正文 年R本政府宣布柠衅,位于F島的核電站皮仁,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏菲宴。R本人自食惡果不足惜贷祈,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,254評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望喝峦。 院中可真熱鬧势誊,春花似錦、人聲如沸谣蠢。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,259評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽眉踱。三九已至勋颖,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間勋锤,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評論 1 262
  • 我被黑心中介騙來泰國打工侥祭, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留叁执,地道東北人茄厘。 一個月前我還...
    沈念sama閱讀 45,497評論 2 354
  • 正文 我出身青樓,卻偏偏與公主長得像谈宛,于是被迫代替她去往敵國和親次哈。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,786評論 2 345

推薦閱讀更多精彩內(nèi)容