Spark-Streaming 文檔之性能調(diào)優(yōu)

性能優(yōu)化

要想使你的Spark流處理應(yīng)用能夠獲得更好地性能,你需要大量的優(yōu)化工作族购。在這一節(jié)中躬贡,我們提供了許多配置和參數(shù)來(lái)對(duì)你的程序進(jìn)行改進(jìn)。首先你需要從兩個(gè)方面出發(fā)來(lái)考慮優(yōu)化工作见坑。

  1. 通過(guò)有效的的使用集群資源來(lái)降低每個(gè)批次的數(shù)據(jù)處理時(shí)間。
  2. 設(shè)置一個(gè)合適的批次大小以便程序能夠盡量快的處理這些數(shù)據(jù)捏检。

降低每個(gè)批次的處理時(shí)間

在Spark中有許多方式可以降低每個(gè)批次的數(shù)據(jù)處理時(shí)間荞驴,你可以參考Tuning Guide,這部分提到了許多優(yōu)化要點(diǎn)贯城。

優(yōu)化數(shù)據(jù)接收并行度

通過(guò)網(wǎng)絡(luò)方式接收數(shù)據(jù)(Flame熊楼,Kafka等),要求把數(shù)據(jù)反序列化后存儲(chǔ)在Spark中能犯。如果數(shù)據(jù)接收是一個(gè)瓶頸孙蒙,那么我們就考慮采用多個(gè)receiver并行的方式。注意悲雳,每一個(gè)input DStream都可以創(chuàng)建一個(gè)單獨(dú)的receiver(創(chuàng)建在worker節(jié)點(diǎn)上)來(lái)接收獨(dú)立地接收流數(shù)據(jù)挎峦。因此,我們可以創(chuàng)建多個(gè)input DStream來(lái)同時(shí)接收多個(gè)數(shù)據(jù)源上的流數(shù)據(jù)合瓢。比如使用一個(gè)Kafka input DStream接收兩個(gè)topic的數(shù)據(jù)坦胶,你完全可以創(chuàng)建兩個(gè)input DStream來(lái)分別接受兩個(gè)topic的數(shù)據(jù),每一個(gè)receiver只負(fù)責(zé)從一個(gè)topic接收數(shù)據(jù)晴楔。這樣就是一個(gè)并行的方式顿苇,來(lái)增加數(shù)據(jù)吞吐量。并且這些DStream可以組合成意義DStream税弃,你定義在這個(gè)DStream上面的transformactions可以作用在每一個(gè)單獨(dú)的input DStream上面纪岁。

val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()

另一個(gè)需要考慮的參數(shù)是receiver的blocking interval,通過(guò)spark.streaming.blockInterval來(lái)定義则果。對(duì)于大多數(shù)的receiver來(lái)說(shuō)幔翰,會(huì)把接受到的數(shù)據(jù)合并成一個(gè)數(shù)據(jù)塊然后存儲(chǔ)到Spark的內(nèi)存中漩氨。每一個(gè)批次中數(shù)據(jù)塊的數(shù)量決定了要使用多少個(gè)任務(wù)來(lái)處理這些數(shù)據(jù)。每一個(gè)receiver的每一個(gè)batch上的task數(shù)量約等于(batch interval / block interval)遗增。比如叫惊,block interval為200ms,那么每2s的批次數(shù)據(jù)需要使用20個(gè)task來(lái)處理做修。如果task的數(shù)量太低(比如小于機(jī)器的內(nèi)核數(shù)量)霍狰,那么就不能夠使用全部的內(nèi)核來(lái)參與計(jì)算,導(dǎo)致效率降低饰及。為了增加每一個(gè)batch interval的task數(shù)量蔗坯,你可以縮小block interval。但是燎含,推薦使用最小的block interval是50ms宾濒,低于這個(gè)值的話接下來(lái)啟動(dòng)任務(wù)的開(kāi)銷(xiāo)將是一個(gè)問(wèn)題。

對(duì)于從多個(gè)input streams/receiver接受數(shù)據(jù)的選擇是重分配數(shù)據(jù)并行度(usinginputStream.repartition(<number of partitions>))瘫镇。這可以在對(duì)數(shù)據(jù)進(jìn)行處理之前鼎兽,指定每一個(gè)批次接收到的數(shù)據(jù)使用task的數(shù)量答姥。

優(yōu)化數(shù)據(jù)處理并行度

如果任何階段計(jì)算任務(wù)的并行度都不夠高铣除,那么會(huì)直接導(dǎo)致集群資源利用率低下(比如我們集群的計(jì)算資源為20個(gè)核心,但是每個(gè)階段的并行度都只有不到10個(gè)鹦付,那么就會(huì)導(dǎo)致分配的集群資源利用率低下j)尚粘。對(duì)于一些分布式reduce操作,比如reduceByKey和reduceByKeyAndWindow敲长,默認(rèn)的并行度通過(guò)spark.default.parallelism來(lái)控制郎嫁。你可以通過(guò)改變這個(gè)參數(shù)值來(lái)改變計(jì)算的并行度。

數(shù)據(jù)序列化

數(shù)據(jù)序列化的開(kāi)銷(xiāo)可以通過(guò)設(shè)置數(shù)據(jù)的序列化方式來(lái)優(yōu)化祈噪。在Spark Streaming中有兩種類(lèi)型的數(shù)據(jù)需要序列化泽铛。

  • Input data - 默認(rèn)情況下接收器接收到的數(shù)據(jù)會(huì)使用StorageLevel.MEMORY_AND_DISK_SER_2來(lái)存儲(chǔ)在executors的內(nèi)存中。數(shù)據(jù)被序列化為bytes以便節(jié)約GC的開(kāi)銷(xiāo)辑鲤,和生成副本來(lái)容忍執(zhí)行器錯(cuò)誤盔腔。當(dāng)然數(shù)據(jù)首先會(huì)保存在內(nèi)存中,知道內(nèi)存不足以裝下需要參與流計(jì)算的數(shù)據(jù)月褥。顯示弛随,序列化是需要開(kāi)銷(xiāo)的,接收器必須反序列化接收到的數(shù)據(jù)然后再次通過(guò)Spaark定義的序列化格式對(duì)數(shù)據(jù)進(jìn)行序列化宁赤。
  • Persisted RDDs generated by Streaming Operations - 通過(guò)計(jì)算得到的RDD會(huì)被持久化到內(nèi)存中舀透。譬如,窗口操作會(huì)把數(shù)據(jù)持久化到內(nèi)存中决左,因?yàn)檫@些數(shù)據(jù)需要計(jì)算多次愕够。然后走贪,與Spark Core默認(rèn)的存儲(chǔ)級(jí)別StorageLevel.MEMORY_ONLY不同,Streaming使用的默認(rèn)存儲(chǔ)級(jí)別為StorageLevel.MEMORY_ONLY_SER链烈,以便減小GC的開(kāi)銷(xiāo)厉斟。

在所以情況下,使用Kryo序列化方式可以有效地減小CPU和內(nèi)存的開(kāi)銷(xiāo)强衡。

在特定的情況下擦秽,可能流處理需要保存的數(shù)據(jù)總量不會(huì)太大。直接保存反序列化后的對(duì)象并不對(duì)產(chǎn)生過(guò)大的GC開(kāi)銷(xiāo)漩勤。如果你使用秒級(jí)的batch interval并且沒(méi)有任何窗口操作的話感挥,那么你可以通過(guò)設(shè)置storage level可以禁用以序列化方式持久化數(shù)據(jù)。這可以降低CPU對(duì)于序列化的開(kāi)銷(xiāo)越败,同時(shí)又不會(huì)帶來(lái)太大的GC開(kāi)銷(xiāo)触幼。

任務(wù)啟動(dòng)開(kāi)銷(xiāo)

如果每秒運(yùn)行的任務(wù)數(shù)量非常高(比如50+每秒),把任務(wù)發(fā)送到slaves時(shí)的開(kāi)銷(xiāo)會(huì)非常大究飞,這樣很難實(shí)現(xiàn)次秒級(jí)別的延時(shí)置谦。你可以通過(guò)以下方式來(lái)優(yōu)化。

  • Execution Mode - 以Standalone模式或者coarse-grained Mesos模式可以獲得更好地任務(wù)運(yùn)行次數(shù)亿傅,比起fine-grained Mesos模式媒峡。

這種方式可以降低批處理時(shí)間100s of milliseconds,從而實(shí)現(xiàn)次秒級(jí)別的批處理葵擎。

合理的批次間隔

為了能夠保證Spark Streaming程序能夠穩(wěn)定的運(yùn)行在集群上谅阿,系統(tǒng)應(yīng)該盡可能快的處理接收到數(shù)據(jù)。話句話來(lái)說(shuō)酬滤,每個(gè)批次的數(shù)據(jù)都應(yīng)該在它生成后盡可能快的被處理签餐。這可以在Streaming Web UI中進(jìn)行監(jiān)控,每個(gè)批次的處理時(shí)間必須要小于每個(gè)批次的間隔時(shí)間盯串。

基于流處理的特性來(lái)說(shuō)氯檐,運(yùn)行在固定集群資源上的應(yīng)用,對(duì)于批次間隔的選取會(huì)嚴(yán)重影響數(shù)據(jù)處理效率体捏。舉個(gè)例子冠摄,讓我們重新考慮一下前面的例子WordCountNetwork。對(duì)于特定的數(shù)據(jù)速率译打,系統(tǒng)可能只能夠保證每?jī)擅肷梢淮卧~頻報(bào)告耗拓,而不是每500毫秒。所以批次間隔就必須設(shè)置成這樣奏司,以便可以持續(xù)的運(yùn)行乔询。

一個(gè)好讓你找到適合你程序的批次大小的方式是,使用一個(gè)保守的批次間隔時(shí)間(5-10s)和一個(gè)較低的數(shù)據(jù)速率來(lái)測(cè)試你的程序韵洋。為了測(cè)試系統(tǒng)是否可以保證這個(gè)數(shù)據(jù)速率竿刁,你可以檢查每一個(gè)批次處理的端到端的時(shí)間延遲(既可以在Spark driver的log4j日志中找到“Total delay”黄锤,也可以使用StreamingListener接口),如果延遲可以維持在一個(gè)與批次間隔差不多的水平食拜,那么系統(tǒng)就是穩(wěn)定的鸵熟。否則,如果延遲在不斷地增加负甸,那么也就意味著系統(tǒng)并不能穩(wěn)定運(yùn)行了流强。一旦你有了一個(gè)穩(wěn)定的配置,那么你可以嘗試加快數(shù)據(jù)速率和降低批次間隔呻待。注意打月,一個(gè)瞬間的延遲增大,可能只是短暫的數(shù)據(jù)率的增長(zhǎng)蚕捉,隨著數(shù)據(jù)率的下降奏篙,延遲會(huì)回到一個(gè)合理的水平。

內(nèi)存優(yōu)化

內(nèi)存調(diào)優(yōu)和GC策略會(huì)在Tuning Guide一節(jié)詳細(xì)討論迫淹,在這里我們只著重說(shuō)明對(duì)于StreamingContext相關(guān)的調(diào)優(yōu)參數(shù)秘通。

Spark Streaming應(yīng)用對(duì)于內(nèi)存的需求量很大程度上取決于你使用了什么樣的transformaction。如果你對(duì)過(guò)去10分鐘的數(shù)據(jù)使用了一個(gè)窗口操作敛熬,那么你的集群就需要足夠多的內(nèi)存來(lái)保存著10分鐘的數(shù)據(jù)肺稀。或者荸型,比如你使用了updateStateByKey方法來(lái)出來(lái)大量的key的數(shù)據(jù)集盹靴,那么你使用的內(nèi)存量肯定就會(huì)大炸茧。相反如果你只是用了一個(gè)簡(jiǎn)單的map-filter-store操作瑞妇,那么需要的內(nèi)存量就會(huì)小。

在通常情況下梭冠,因?yàn)閺慕邮掌鹘邮盏降臄?shù)據(jù)使用的存儲(chǔ)級(jí)別是StorageLevel.MEMORY_AND_DISK_SER_2辕狰,數(shù)據(jù)在內(nèi)存中裝不下就是放到硬盤(pán)中。這就會(huì)降低了流處理應(yīng)用的性能控漠,因此建議提供足夠的內(nèi)存來(lái)提高性能蔓倍。

內(nèi)存優(yōu)化的另一個(gè)方面是垃圾回收機(jī)制。因?yàn)榱魈幚沓绦蛐枰脱舆t盐捷,所以不希望JVM的垃圾回收影響程序的執(zhí)行偶翅。

下面展示 一些基于內(nèi)存和GC策略調(diào)優(yōu)的參數(shù)。

  • Persistence Level of DStreams - 之前數(shù)據(jù)序列化這一章中已經(jīng)提到碉渡,接收器接收到的數(shù)據(jù)會(huì)被默認(rèn)序列化聚谁,這可以降低內(nèi)存使用和GC開(kāi)銷(xiāo)。使用Kyro方式進(jìn)行序列化可以進(jìn)一步地降低序列化后數(shù)據(jù)大大小和內(nèi)存消耗量滞诺。進(jìn)一步的降低內(nèi)存使用量可以使用壓縮(spark.rdd.compress)形导,但同時(shí)會(huì)增加CPU的開(kāi)銷(xiāo)环疼。
  • Clearing old data - 默認(rèn)情況下,所有的輸入數(shù)據(jù)和通過(guò)DStream transformaction產(chǎn)生的持久化的RDD都會(huì)被自動(dòng)清理朵耕。Spark Streaming會(huì)根據(jù)使用的transformaction來(lái)決定何時(shí)清理數(shù)據(jù)炫隶。舉個(gè)例子,如果你使用一個(gè)10分鐘的窗口阎曹,那么程序會(huì)保留10分鐘的數(shù)據(jù)伪阶,然后自動(dòng)的清理老數(shù)據(jù)。當(dāng)然你可以通過(guò)設(shè)置streamingContext.remember參數(shù)來(lái)讓數(shù)據(jù)保留更長(zhǎng)的時(shí)間处嫌。

要點(diǎn)

  • 一個(gè)DStream會(huì)關(guān)聯(lián)一個(gè)單獨(dú)的接收器望门,為了能夠并行運(yùn)行多個(gè)接收器,就要?jiǎng)?chuàng)建多個(gè)DStream锰霜。一個(gè)接收器需要一個(gè)executor筹误,占用一個(gè)內(nèi)核。為了確保在接收器占用了內(nèi)核后癣缅,還有足夠的內(nèi)核來(lái)進(jìn)行處理工作厨剪,你必須要再內(nèi)核分配的時(shí)候同時(shí)考慮到這兩部分。接收器以輪詢的方式來(lái)分配執(zhí)行器友存。
  • 從流數(shù)據(jù)源接收到數(shù)據(jù)后祷膳,接收器會(huì)創(chuàng)建一個(gè)數(shù)據(jù)塊。每個(gè)block interval都會(huì)生成一個(gè)數(shù)據(jù)塊屡立。N個(gè)數(shù)據(jù)塊會(huì)被生成在一個(gè)batch interval內(nèi)(N=batchInterval/blockInterval)直晨。這些塊被塊管理器分發(fā)到其他執(zhí)行器的塊管理器上。在這之后膨俐,運(yùn)行在driver上的網(wǎng)絡(luò)接受追蹤器會(huì)被通知這些塊所在的位置勇皇,以便進(jìn)行下一個(gè)的數(shù)據(jù)處理。
  • 在batch interval期間焚刺,driver會(huì)對(duì)這些塊創(chuàng)建一個(gè)RDD敛摘。在這個(gè)batch interval內(nèi)生成的快都是這個(gè)RDD的分區(qū)。每一個(gè)分區(qū)在Spark上都是一個(gè)任務(wù)乳愉。如果blockInterval== batchinterval兄淫,則意味著只有一個(gè)分區(qū),并且可能就直接在本地處理了蔓姚。
  • 這些塊上的map任務(wù)都運(yùn)行在執(zhí)行器單元上(一個(gè)在接收數(shù)據(jù)塊的位置捕虽,另一個(gè)在數(shù)據(jù)塊被備份到的位置)。這可以讓block并不必關(guān)心block interval坡脐,除非是非本地調(diào)度泄私。較大的block interval會(huì)帶了更大的block,參數(shù)spark.locality.wait,能夠讓塊更可能在本地處理挖滤。你需要在這兩個(gè)參數(shù)間找到一種平衡崩溪,來(lái)能夠保證bigger block能夠在本地執(zhí)行。
    -除了可以使用batch interval和block interval之外斩松,你還可以通過(guò)inputDstream.repartition(n)來(lái)定義分區(qū)數(shù)伶唯。這會(huì)對(duì)RDD中的數(shù)據(jù)進(jìn)行隨機(jī)重組,生成n個(gè)數(shù)據(jù)分區(qū)惧盹。為了更合理的分區(qū)數(shù)乳幸,你必須付出一個(gè)數(shù)據(jù)重組的代價(jià)。RDD的處理钧椰,都是作為一個(gè)job通過(guò)driver的jobscheduler來(lái)進(jìn)行調(diào)度的粹断。在一個(gè)給定的時(shí)間點(diǎn)只有一個(gè)job是活動(dòng)的。所以嫡霞,如果一個(gè)job是執(zhí)行中的瓶埋,那么其他job就是排隊(duì)中。
  • 如果你有兩個(gè)DStream诊沪,那么就會(huì)形成兩個(gè)RDD养筒,也就會(huì)生成兩個(gè)Job,然后被一個(gè)接一個(gè)的調(diào)度端姚。為了避免這種情況晕粪,你可以對(duì)峙兩個(gè)DStream執(zhí)行union操作。這保證了兩個(gè)DStream RDD會(huì)產(chǎn)生一個(gè)unionRDD渐裸,這個(gè)unionRDD會(huì)當(dāng)做一個(gè)單獨(dú)的job巫湘。然而這對(duì)RDDs中的分區(qū)并沒(méi)有任何影響。
  • 如果批次處理時(shí)間遠(yuǎn)大于批次間隔昏鹃,那么接收器的內(nèi)存會(huì)被塞滿尚氛,并且最終會(huì)拋出異常(最可能是BlockNotFoundException)。目前盆顾,我們還沒(méi)有方式來(lái)停止這個(gè)接收器怠褐,只能通過(guò)Spark配置spark.streaming.receiver.maxRate來(lái)限制接收器的數(shù)據(jù)接受率畏梆。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末您宪,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子奠涌,更是在濱河造成了極大的恐慌宪巨,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,324評(píng)論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件溜畅,死亡現(xiàn)場(chǎng)離奇詭異捏卓,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,356評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門(mén)怠晴,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)遥金,“玉大人,你說(shuō)我怎么就攤上這事蒜田「逍担” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 162,328評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵冲粤,是天一觀的道長(zhǎng)美莫。 經(jīng)常有香客問(wèn)我,道長(zhǎng)梯捕,這世上最難降的妖魔是什么厢呵? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,147評(píng)論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮傀顾,結(jié)果婚禮上襟铭,老公的妹妹穿的比我還像新娘。我一直安慰自己短曾,他們只是感情好蝌矛,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,160評(píng)論 6 388
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著错英,像睡著了一般入撒。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上椭岩,一...
    開(kāi)封第一講書(shū)人閱讀 51,115評(píng)論 1 296
  • 那天茅逮,我揣著相機(jī)與錄音,去河邊找鬼判哥。 笑死献雅,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的塌计。 我是一名探鬼主播,決...
    沈念sama閱讀 40,025評(píng)論 3 417
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼锌仅,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了热芹?” 一聲冷哼從身側(cè)響起贱傀,我...
    開(kāi)封第一講書(shū)人閱讀 38,867評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤府寒,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體剖淀,經(jīng)...
    沈念sama閱讀 45,307評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,528評(píng)論 2 332
  • 正文 我和宋清朗相戀三年祷蝌,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片巨朦。...
    茶點(diǎn)故事閱讀 39,688評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖糊啡,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情吁津,我是刑警寧澤棚蓄,帶...
    沈念sama閱讀 35,409評(píng)論 5 343
  • 正文 年R本政府宣布,位于F島的核電站碍脏,受9級(jí)特大地震影響梭依,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜典尾,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,001評(píng)論 3 325
  • 文/蒙蒙 一役拴、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧钾埂,春花似錦河闰、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,657評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至髓考,卻和暖如春部念,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背氨菇。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,811評(píng)論 1 268
  • 我被黑心中介騙來(lái)泰國(guó)打工儡炼, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人门驾。 一個(gè)月前我還...
    沈念sama閱讀 47,685評(píng)論 2 368
  • 正文 我出身青樓射赛,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親奶是。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,573評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容