Spark Streaming 的玫瑰與刺

前言

說人話:其實(shí)就是講Spark Streaming 的好處與坑。好處主要從一些大的方面講漓糙,坑則是從實(shí)際場(chǎng)景中遇到的一些小細(xì)節(jié)描述铣缠。

玫瑰篇

玫瑰篇主要是說Spark Streaming的優(yōu)勢(shì)點(diǎn)。

玫瑰之代碼復(fù)用

這主要得益于Spark的設(shè)計(jì)昆禽,以及平臺(tái)的全面性蝗蛙。你寫的流處理的代碼可以很方便的適用于Spark平臺(tái)上的批處理,交互式處理醉鳖。因?yàn)樗麄儽旧矶际腔赗DD模型的捡硅,并且Spark Streaming的設(shè)計(jì)者也做了比較好的封裝和兼容。所以我說RDD是個(gè)很強(qiáng)大的框盗棵,能把各種場(chǎng)景都給框住壮韭,這就是高度抽象和思考后的結(jié)果北发。

玫瑰之機(jī)器學(xué)習(xí)

如果你使用Spark MLlib 做模型訓(xùn)練。恭喜你喷屋,首先是很多算法已經(jīng)支持Spark Streaming琳拨,譬如k-means 就支持流式數(shù)據(jù)更新模型。 其次屯曹,你也可以在Spark Streaming中直接將離線計(jì)算好的模型load進(jìn)來狱庇,然后對(duì)新進(jìn)來的數(shù)據(jù)做實(shí)時(shí)的Predict操作。

玫瑰之SQL支持

Spark Streaming 里天然就可以使用 sql/dataframe/datasets 等恶耽。而且時(shí)間窗口的使用可以極大擴(kuò)展這種使用場(chǎng)景密任,譬如各種系統(tǒng)預(yù)警等。類似Storm則需要額外的開發(fā)與支持偷俭。

玫瑰之吞吐和實(shí)時(shí)的有效控制

Spark Streaming 可以很好的控制實(shí)時(shí)的程度(小時(shí)批什,分鐘,秒)社搅。極端情況可以設(shè)置到毫秒驻债。

玫瑰之概述

Spark Streaming 可以很好的和Spark其他組件進(jìn)行交互,獲取其支持形葬。同時(shí)Spark 生態(tài)圈的快速發(fā)展合呐,亦能從中受益。

刺篇

刺篇就是描述Spark Streaming 的一些問題笙以,做選型前關(guān)注這些問題可以有效的降低使用風(fēng)險(xiǎn)淌实。

checkpoint 之刺

checkpoint 是個(gè)很好的恢復(fù)機(jī)制。但是方案比較粗暴猖腕,直接通過序列化的機(jī)制寫入到文件系統(tǒng)拆祈,導(dǎo)致代碼變更和配置變更無法生效。實(shí)際場(chǎng)景是升級(jí)往往比系統(tǒng)崩潰的頻率高太多倘感。但是升級(jí)需要能夠無縫的銜接上一次的偏移量放坏。所以spark streaming在無法容忍數(shù)據(jù)有丟失的情況下,你需要自己記錄偏移量老玛,然后從上一次進(jìn)行恢復(fù)淤年。

我們目前是重寫了相關(guān)的代碼,每次記錄偏移量蜡豹,不過只有在升級(jí)的時(shí)候才會(huì)讀取自己記錄的偏移量麸粮,其他情況都是依然采用checkpoint機(jī)制。

Kafka 之刺

這個(gè)和Spark Streaming相關(guān)镜廉,也不太相關(guān)弄诲。說相關(guān)是因?yàn)镾park 對(duì)很多異常處理比較簡(jiǎn)單。很多是和Kafka配置相關(guān)的娇唯。我舉個(gè)例子:

如果消息體太大了齐遵,超過 fetch.message.max.bytes=1m,那么Spark Streaming會(huì)直接拋出OffsetOutOfRangeException異常凤巨,然后停止服務(wù)。

對(duì)應(yīng)的錯(cuò)誤會(huì)從這行代碼拋出:

if (!iter.hasNext) {
        assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))
        finished = true
        null.asInstanceOf[R]
      }

其實(shí)就是消費(fèi)的完成后 實(shí)際的消費(fèi)數(shù)據(jù)量和預(yù)先估計(jì)的量不一致洛搀。

你在日志中看到的信息其實(shí)是這個(gè)代碼答應(yīng)出來的:

private def errRanOutBeforeEnd(part: KafkaRDDPartition): String =
    s"Ran out of messages before reaching ending offset ${part.untilOffset} " +
    s"for topic ${part.topic} partition ${part.partition} start ${part.fromOffset}." +
    " This should not happen, and indicates that messages may have been lost"

解決辦法自然是把 fetch.message.max.bytes 設(shè)置大些敢茁。

如果你使用Spark Streaming去追數(shù)據(jù),從頭開始消費(fèi)kafka,而Kafka因?yàn)槟撤N原因留美,老數(shù)據(jù)快速的被清理掉彰檬,也會(huì)引發(fā)OffsetOutOfRangeException錯(cuò)誤。并且使得Spark Streaming程序異常的終止谎砾。

解決辦法是事先記錄kafka偏移量和時(shí)間的關(guān)系(可以隔幾秒記錄一次)逢倍,然后根據(jù)時(shí)間找到一個(gè)較大的偏移量開始消費(fèi)。

或者你根據(jù)目前Kafka新增數(shù)據(jù)的消費(fèi)速度景图,給smallest獲取到的偏移量再加一個(gè)較大的值较雕,避免出現(xiàn)Spark Streaming 在fetch的時(shí)候數(shù)據(jù)不存在的情況。

Kafka partition 映射 RDD partition 之刺

Kafka的分區(qū)數(shù)決定了你的并行度(我們假設(shè)你使用Direct Approach的模式集成)挚币。為了獲得更大的并行度亮蒋,則需要進(jìn)行一次repartition,而repartition 就意味著需要發(fā)生Shuffle,在流式計(jì)算里妆毕,可能會(huì)消耗掉我們寶貴的時(shí)間慎玖。 為了能夠避免Shuffle,并且提高Spark Streaming處理的并行度,我們重寫了 DirectKafkaInputDStream,KafkaRDD,KafkaUtils等類笛粘,實(shí)現(xiàn)了一個(gè)Kafka partition 可以映射為多個(gè)RDD partition的功能趁怔。譬如你有M個(gè)Kafka partitions,則可映射成 M*N個(gè) RDD partitions。 其中N 為>1 的正整數(shù)薪前。

我們期望官方能夠?qū)崿F(xiàn)將一個(gè)Kafka的partitions 映射為多個(gè)Spark 的partitions,避免發(fā)生Shuffle而導(dǎo)致多次的數(shù)據(jù)移動(dòng)润努。

textFileStream

其實(shí)使用textFileStream 的人應(yīng)該也不少。因?yàn)榭梢院芊奖愕谋O(jiān)控HDFS上某個(gè)文件夾下的文件示括,并且進(jìn)行計(jì)算铺浇。這里我們遇到的一個(gè)問題是,如果底層比如是壓縮文件例诀,遇到有順壞的文件随抠,你是跳不過去的裁着,直接會(huì)讓Spark Streaming 異常退出繁涂。 官方并沒有提供合適的方式讓你跳過損壞的文件。
以NewHadoopRDD為例二驰,里面有這么幾行代碼扔罪,獲取一條新的數(shù)據(jù):


override def hasNext: Boolean = {
        if (!finished && !havePair) {
         //下面這行是問題的所在點(diǎn)
          finished = !reader.nextKeyValue 
          if (finished) {          
            close()
          }
          havePair = !finished
        }
        !finished
      }

通過reader 獲取下一條記錄的時(shí)候,譬如是一個(gè)損壞的gzip文件桶雀,可能就會(huì)拋出異常矿酵,而這個(gè)異常是用戶catch不到的唬复,直接讓Spark Streaming程序掛掉了。

而在 HadoopRDD類中全肮,對(duì)應(yīng)的實(shí)現(xiàn)如下:

override def getNext(): (K, V) = {
        try {
          finished = !reader.next(key, value)
        } catch {
          case eof: EOFException =>
            finished = true
        }
        if (!finished) {
          inputMetrics.incRecordsRead(1)
        }
        (key, value)
      }

這里好歹做了個(gè)EOFException敞咧。然而,如果是一個(gè)壓縮文件辜腺,解壓的時(shí)候就直接產(chǎn)生錯(cuò)誤了休建,一般而言是 IOException,而不是EOFException了,這個(gè)時(shí)候也就歇菜了评疗。

個(gè)人認(rèn)為應(yīng)該添加一些配置测砂,允許用戶可以選擇如何對(duì)待這種有損壞或者無法解壓的文件。

因?yàn)楝F(xiàn)階段我們并沒有維護(hù)一個(gè)Spark的私有版本百匆,所以是通過重寫FileInputDStream,NewHadoopRDD 等相關(guān)類來修正該問題砌些。

Shuffle 之刺

Shuffle (尤其是每個(gè)周期數(shù)據(jù)量很大的情況)是Spark Streaming 不可避免的疼痛,尤其是數(shù)據(jù)量極大的情況,因?yàn)镾park Streaming對(duì)處理的時(shí)間是有限制的加匈。我們有一個(gè)場(chǎng)景存璃,是五分鐘一個(gè)周期,我們僅僅是做了一個(gè)repartion雕拼,耗時(shí)就達(dá)到2.1分鐘(包括到Kafka取數(shù)據(jù))∮星桑現(xiàn)階段Spark 的Shuffle實(shí)現(xiàn)都需要落磁盤,并且Shuffle Write 和 Shuffle Read 階段是完全分開悲没,后者必須等到前者都完成才能開始工作篮迎。我認(rèn)為Spark Streaming有必要單獨(dú)開發(fā)一個(gè)更快速,完全基于內(nèi)存的Shuffle方案示姿。

內(nèi)存之刺

在Spark Streaming中甜橱,你也會(huì)遇到在Spark中常見的問題,典型如Executor Lost 相關(guān)的問題(shuffle fetch 失敗栈戳,Task失敗重試等)岂傲。這就意味著發(fā)生了內(nèi)存不足或者數(shù)據(jù)傾斜的問題。這個(gè)目前你需要考慮如下幾個(gè)點(diǎn)以期獲得解決方案:

  1. 相同資源下子檀,增加partition數(shù)可以減少內(nèi)存問題镊掖。 原因如下:通過增加partition數(shù),每個(gè)task要處理的數(shù)據(jù)少了褂痰,同一時(shí)間內(nèi)亩进,所有正在
    運(yùn)行的task要處理的數(shù)量少了很多,所有Executor占用的內(nèi)存也變小了缩歪。這可以緩解數(shù)據(jù)傾斜以及內(nèi)存不足的壓力归薛。

  2. 關(guān)注shuffle read 階段的并行數(shù)。例如reduce,group 之類的函數(shù),其實(shí)他們都有第二個(gè)參數(shù)主籍,并行度(partition數(shù))习贫,只是大家一般都不設(shè)置。不過出了問題再設(shè)置一下千元,也不錯(cuò)苫昌。

  3. 給一個(gè)Executor 核數(shù)設(shè)置的太多,也就意味著同一時(shí)刻幸海,在該Executor 的內(nèi)存壓力會(huì)更大蜡歹,GC也會(huì)更頻繁。我一般會(huì)控制在3個(gè)左右涕烧。然后通過提高Executor數(shù)量來保持資源的總量不變月而。

監(jiān)控之刺

Spark Streaming 的UI 上的Executors Tab缺少一個(gè)最大的監(jiān)控,就是Worker內(nèi)存GC詳情议纯。雖然我們可以將這些信息導(dǎo)入到 第三方監(jiān)控中父款,然而終究是不如在 Spark UI上展現(xiàn)更加方便。 為此我們也將該功能列入研發(fā)計(jì)劃瞻凤。

總結(jié)

目前Spark Streaming 可以應(yīng)對(duì)的場(chǎng)景不少憨攒,但是在很多場(chǎng)景上,還是有這樣那樣的問題阀参。建議調(diào)研后都進(jìn)一步做測(cè)試再做出是否遷移到該平臺(tái)的決定肝集。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市蛛壳,隨后出現(xiàn)的幾起案子杏瞻,更是在濱河造成了極大的恐慌,老刑警劉巖衙荐,帶你破解...
    沈念sama閱讀 207,248評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件捞挥,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡忧吟,警方通過查閱死者的電腦和手機(jī)砌函,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,681評(píng)論 2 381
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來溜族,“玉大人讹俊,你說我怎么就攤上這事』褪悖” “怎么了仍劈?”我有些...
    開封第一講書人閱讀 153,443評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)摧玫。 經(jīng)常有香客問我耳奕,道長(zhǎng)绑青,這世上最難降的妖魔是什么诬像? 我笑而不...
    開封第一講書人閱讀 55,475評(píng)論 1 279
  • 正文 為了忘掉前任屋群,我火速辦了婚禮,結(jié)果婚禮上坏挠,老公的妹妹穿的比我還像新娘芍躏。我一直安慰自己,他們只是感情好降狠,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,458評(píng)論 5 374
  • 文/花漫 我一把揭開白布对竣。 她就那樣靜靜地躺著,像睡著了一般榜配。 火紅的嫁衣襯著肌膚如雪否纬。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,185評(píng)論 1 284
  • 那天蛋褥,我揣著相機(jī)與錄音临燃,去河邊找鬼。 笑死烙心,一個(gè)胖子當(dāng)著我的面吹牛膜廊,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播淫茵,決...
    沈念sama閱讀 38,451評(píng)論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼爪瓜,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了匙瘪?” 一聲冷哼從身側(cè)響起铆铆,我...
    開封第一講書人閱讀 37,112評(píng)論 0 261
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎丹喻,沒想到半個(gè)月后算灸,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,609評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡驻啤,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,083評(píng)論 2 325
  • 正文 我和宋清朗相戀三年菲驴,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片骑冗。...
    茶點(diǎn)故事閱讀 38,163評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡赊瞬,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出贼涩,到底是詐尸還是另有隱情巧涧,我是刑警寧澤,帶...
    沈念sama閱讀 33,803評(píng)論 4 323
  • 正文 年R本政府宣布遥倦,位于F島的核電站谤绳,受9級(jí)特大地震影響占锯,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜缩筛,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,357評(píng)論 3 307
  • 文/蒙蒙 一消略、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧瞎抛,春花似錦艺演、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,357評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至断凶,卻和暖如春伤提,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背认烁。 一陣腳步聲響...
    開封第一講書人閱讀 31,590評(píng)論 1 261
  • 我被黑心中介騙來泰國打工肿男, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人砚著。 一個(gè)月前我還...
    沈念sama閱讀 45,636評(píng)論 2 355
  • 正文 我出身青樓次伶,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國和親稽穆。 傳聞我的和親對(duì)象是個(gè)殘疾皇子冠王,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,925評(píng)論 2 344

推薦閱讀更多精彩內(nèi)容