性能優(yōu)化
要想使你的Spark流處理應(yīng)用能夠獲得更好地性能,你需要大量的優(yōu)化工作族购。在這一節(jié)中躬贡,我們提供了許多配置和參數(shù)來(lái)對(duì)你的程序進(jìn)行改進(jìn)。首先你需要從兩個(gè)方面出發(fā)來(lái)考慮優(yōu)化工作见坑。
- 通過(guò)有效的的使用集群資源來(lái)降低每個(gè)批次的數(shù)據(jù)處理時(shí)間。
- 設(shè)置一個(gè)合適的批次大小以便程序能夠盡量快的處理這些數(shù)據(jù)捏检。
降低每個(gè)批次的處理時(shí)間
在Spark中有許多方式可以降低每個(gè)批次的數(shù)據(jù)處理時(shí)間荞驴,你可以參考Tuning Guide,這部分提到了許多優(yōu)化要點(diǎn)贯城。
優(yōu)化數(shù)據(jù)接收并行度
通過(guò)網(wǎng)絡(luò)方式接收數(shù)據(jù)(Flame熊楼,Kafka等),要求把數(shù)據(jù)反序列化后存儲(chǔ)在Spark中能犯。如果數(shù)據(jù)接收是一個(gè)瓶頸孙蒙,那么我們就考慮采用多個(gè)receiver并行的方式。注意悲雳,每一個(gè)input DStream都可以創(chuàng)建一個(gè)單獨(dú)的receiver(創(chuàng)建在worker節(jié)點(diǎn)上)來(lái)接收獨(dú)立地接收流數(shù)據(jù)挎峦。因此,我們可以創(chuàng)建多個(gè)input DStream來(lái)同時(shí)接收多個(gè)數(shù)據(jù)源上的流數(shù)據(jù)合瓢。比如使用一個(gè)Kafka input DStream接收兩個(gè)topic的數(shù)據(jù)坦胶,你完全可以創(chuàng)建兩個(gè)input DStream來(lái)分別接受兩個(gè)topic的數(shù)據(jù),每一個(gè)receiver只負(fù)責(zé)從一個(gè)topic接收數(shù)據(jù)晴楔。這樣就是一個(gè)并行的方式顿苇,來(lái)增加數(shù)據(jù)吞吐量。并且這些DStream可以組合成意義DStream税弃,你定義在這個(gè)DStream上面的transformactions可以作用在每一個(gè)單獨(dú)的input DStream上面纪岁。
val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()
另一個(gè)需要考慮的參數(shù)是receiver的blocking interval,通過(guò)spark.streaming.blockInterval來(lái)定義则果。對(duì)于大多數(shù)的receiver來(lái)說(shuō)幔翰,會(huì)把接受到的數(shù)據(jù)合并成一個(gè)數(shù)據(jù)塊然后存儲(chǔ)到Spark的內(nèi)存中漩氨。每一個(gè)批次中數(shù)據(jù)塊的數(shù)量決定了要使用多少個(gè)任務(wù)來(lái)處理這些數(shù)據(jù)。每一個(gè)receiver的每一個(gè)batch上的task數(shù)量約等于(batch interval / block interval)遗增。比如叫惊,block interval為200ms,那么每2s的批次數(shù)據(jù)需要使用20個(gè)task來(lái)處理做修。如果task的數(shù)量太低(比如小于機(jī)器的內(nèi)核數(shù)量)霍狰,那么就不能夠使用全部的內(nèi)核來(lái)參與計(jì)算,導(dǎo)致效率降低饰及。為了增加每一個(gè)batch interval的task數(shù)量蔗坯,你可以縮小block interval。但是燎含,推薦使用最小的block interval是50ms宾濒,低于這個(gè)值的話接下來(lái)啟動(dòng)任務(wù)的開(kāi)銷(xiāo)將是一個(gè)問(wèn)題。
對(duì)于從多個(gè)input streams/receiver接受數(shù)據(jù)的選擇是重分配數(shù)據(jù)并行度(usinginputStream.repartition(<number of partitions>))瘫镇。這可以在對(duì)數(shù)據(jù)進(jìn)行處理之前鼎兽,指定每一個(gè)批次接收到的數(shù)據(jù)使用task的數(shù)量答姥。
優(yōu)化數(shù)據(jù)處理并行度
如果任何階段計(jì)算任務(wù)的并行度都不夠高铣除,那么會(huì)直接導(dǎo)致集群資源利用率低下(比如我們集群的計(jì)算資源為20個(gè)核心,但是每個(gè)階段的并行度都只有不到10個(gè)鹦付,那么就會(huì)導(dǎo)致分配的集群資源利用率低下j)尚粘。對(duì)于一些分布式reduce操作,比如reduceByKey和reduceByKeyAndWindow敲长,默認(rèn)的并行度通過(guò)spark.default.parallelism來(lái)控制郎嫁。你可以通過(guò)改變這個(gè)參數(shù)值來(lái)改變計(jì)算的并行度。
數(shù)據(jù)序列化
數(shù)據(jù)序列化的開(kāi)銷(xiāo)可以通過(guò)設(shè)置數(shù)據(jù)的序列化方式來(lái)優(yōu)化祈噪。在Spark Streaming中有兩種類(lèi)型的數(shù)據(jù)需要序列化泽铛。
- Input data - 默認(rèn)情況下接收器接收到的數(shù)據(jù)會(huì)使用StorageLevel.MEMORY_AND_DISK_SER_2來(lái)存儲(chǔ)在executors的內(nèi)存中。數(shù)據(jù)被序列化為bytes以便節(jié)約GC的開(kāi)銷(xiāo)辑鲤,和生成副本來(lái)容忍執(zhí)行器錯(cuò)誤盔腔。當(dāng)然數(shù)據(jù)首先會(huì)保存在內(nèi)存中,知道內(nèi)存不足以裝下需要參與流計(jì)算的數(shù)據(jù)月褥。顯示弛随,序列化是需要開(kāi)銷(xiāo)的,接收器必須反序列化接收到的數(shù)據(jù)然后再次通過(guò)Spaark定義的序列化格式對(duì)數(shù)據(jù)進(jìn)行序列化宁赤。
- Persisted RDDs generated by Streaming Operations - 通過(guò)計(jì)算得到的RDD會(huì)被持久化到內(nèi)存中舀透。譬如,窗口操作會(huì)把數(shù)據(jù)持久化到內(nèi)存中决左,因?yàn)檫@些數(shù)據(jù)需要計(jì)算多次愕够。然后走贪,與Spark Core默認(rèn)的存儲(chǔ)級(jí)別StorageLevel.MEMORY_ONLY不同,Streaming使用的默認(rèn)存儲(chǔ)級(jí)別為StorageLevel.MEMORY_ONLY_SER链烈,以便減小GC的開(kāi)銷(xiāo)厉斟。
在所以情況下,使用Kryo序列化方式可以有效地減小CPU和內(nèi)存的開(kāi)銷(xiāo)强衡。
在特定的情況下擦秽,可能流處理需要保存的數(shù)據(jù)總量不會(huì)太大。直接保存反序列化后的對(duì)象并不對(duì)產(chǎn)生過(guò)大的GC開(kāi)銷(xiāo)漩勤。如果你使用秒級(jí)的batch interval并且沒(méi)有任何窗口操作的話感挥,那么你可以通過(guò)設(shè)置storage level可以禁用以序列化方式持久化數(shù)據(jù)。這可以降低CPU對(duì)于序列化的開(kāi)銷(xiāo)越败,同時(shí)又不會(huì)帶來(lái)太大的GC開(kāi)銷(xiāo)触幼。
任務(wù)啟動(dòng)開(kāi)銷(xiāo)
如果每秒運(yùn)行的任務(wù)數(shù)量非常高(比如50+每秒),把任務(wù)發(fā)送到slaves時(shí)的開(kāi)銷(xiāo)會(huì)非常大究飞,這樣很難實(shí)現(xiàn)次秒級(jí)別的延時(shí)置谦。你可以通過(guò)以下方式來(lái)優(yōu)化。
- Execution Mode - 以Standalone模式或者coarse-grained Mesos模式可以獲得更好地任務(wù)運(yùn)行次數(shù)亿傅,比起fine-grained Mesos模式媒峡。
這種方式可以降低批處理時(shí)間100s of milliseconds,從而實(shí)現(xiàn)次秒級(jí)別的批處理葵擎。
合理的批次間隔
為了能夠保證Spark Streaming程序能夠穩(wěn)定的運(yùn)行在集群上谅阿,系統(tǒng)應(yīng)該盡可能快的處理接收到數(shù)據(jù)。話句話來(lái)說(shuō)酬滤,每個(gè)批次的數(shù)據(jù)都應(yīng)該在它生成后盡可能快的被處理签餐。這可以在Streaming Web UI中進(jìn)行監(jiān)控,每個(gè)批次的處理時(shí)間必須要小于每個(gè)批次的間隔時(shí)間盯串。
基于流處理的特性來(lái)說(shuō)氯檐,運(yùn)行在固定集群資源上的應(yīng)用,對(duì)于批次間隔的選取會(huì)嚴(yán)重影響數(shù)據(jù)處理效率体捏。舉個(gè)例子冠摄,讓我們重新考慮一下前面的例子WordCountNetwork。對(duì)于特定的數(shù)據(jù)速率译打,系統(tǒng)可能只能夠保證每?jī)擅肷梢淮卧~頻報(bào)告耗拓,而不是每500毫秒。所以批次間隔就必須設(shè)置成這樣奏司,以便可以持續(xù)的運(yùn)行乔询。
一個(gè)好讓你找到適合你程序的批次大小的方式是,使用一個(gè)保守的批次間隔時(shí)間(5-10s)和一個(gè)較低的數(shù)據(jù)速率來(lái)測(cè)試你的程序韵洋。為了測(cè)試系統(tǒng)是否可以保證這個(gè)數(shù)據(jù)速率竿刁,你可以檢查每一個(gè)批次處理的端到端的時(shí)間延遲(既可以在Spark driver的log4j日志中找到“Total delay”黄锤,也可以使用StreamingListener接口),如果延遲可以維持在一個(gè)與批次間隔差不多的水平食拜,那么系統(tǒng)就是穩(wěn)定的鸵熟。否則,如果延遲在不斷地增加负甸,那么也就意味著系統(tǒng)并不能穩(wěn)定運(yùn)行了流强。一旦你有了一個(gè)穩(wěn)定的配置,那么你可以嘗試加快數(shù)據(jù)速率和降低批次間隔呻待。注意打月,一個(gè)瞬間的延遲增大,可能只是短暫的數(shù)據(jù)率的增長(zhǎng)蚕捉,隨著數(shù)據(jù)率的下降奏篙,延遲會(huì)回到一個(gè)合理的水平。
內(nèi)存優(yōu)化
內(nèi)存調(diào)優(yōu)和GC策略會(huì)在Tuning Guide一節(jié)詳細(xì)討論迫淹,在這里我們只著重說(shuō)明對(duì)于StreamingContext相關(guān)的調(diào)優(yōu)參數(shù)秘通。
Spark Streaming應(yīng)用對(duì)于內(nèi)存的需求量很大程度上取決于你使用了什么樣的transformaction。如果你對(duì)過(guò)去10分鐘的數(shù)據(jù)使用了一個(gè)窗口操作敛熬,那么你的集群就需要足夠多的內(nèi)存來(lái)保存著10分鐘的數(shù)據(jù)肺稀。或者荸型,比如你使用了updateStateByKey方法來(lái)出來(lái)大量的key的數(shù)據(jù)集盹靴,那么你使用的內(nèi)存量肯定就會(huì)大炸茧。相反如果你只是用了一個(gè)簡(jiǎn)單的map-filter-store操作瑞妇,那么需要的內(nèi)存量就會(huì)小。
在通常情況下梭冠,因?yàn)閺慕邮掌鹘邮盏降臄?shù)據(jù)使用的存儲(chǔ)級(jí)別是StorageLevel.MEMORY_AND_DISK_SER_2辕狰,數(shù)據(jù)在內(nèi)存中裝不下就是放到硬盤(pán)中。這就會(huì)降低了流處理應(yīng)用的性能控漠,因此建議提供足夠的內(nèi)存來(lái)提高性能蔓倍。
內(nèi)存優(yōu)化的另一個(gè)方面是垃圾回收機(jī)制。因?yàn)榱魈幚沓绦蛐枰脱舆t盐捷,所以不希望JVM的垃圾回收影響程序的執(zhí)行偶翅。
下面展示 一些基于內(nèi)存和GC策略調(diào)優(yōu)的參數(shù)。
- Persistence Level of DStreams - 之前數(shù)據(jù)序列化這一章中已經(jīng)提到碉渡,接收器接收到的數(shù)據(jù)會(huì)被默認(rèn)序列化聚谁,這可以降低內(nèi)存使用和GC開(kāi)銷(xiāo)。使用Kyro方式進(jìn)行序列化可以進(jìn)一步地降低序列化后數(shù)據(jù)大大小和內(nèi)存消耗量滞诺。進(jìn)一步的降低內(nèi)存使用量可以使用壓縮(spark.rdd.compress)形导,但同時(shí)會(huì)增加CPU的開(kāi)銷(xiāo)环疼。
- Clearing old data - 默認(rèn)情況下,所有的輸入數(shù)據(jù)和通過(guò)DStream transformaction產(chǎn)生的持久化的RDD都會(huì)被自動(dòng)清理朵耕。Spark Streaming會(huì)根據(jù)使用的transformaction來(lái)決定何時(shí)清理數(shù)據(jù)炫隶。舉個(gè)例子,如果你使用一個(gè)10分鐘的窗口阎曹,那么程序會(huì)保留10分鐘的數(shù)據(jù)伪阶,然后自動(dòng)的清理老數(shù)據(jù)。當(dāng)然你可以通過(guò)設(shè)置streamingContext.remember參數(shù)來(lái)讓數(shù)據(jù)保留更長(zhǎng)的時(shí)間处嫌。
要點(diǎn)
- 一個(gè)DStream會(huì)關(guān)聯(lián)一個(gè)單獨(dú)的接收器望门,為了能夠并行運(yùn)行多個(gè)接收器,就要?jiǎng)?chuàng)建多個(gè)DStream锰霜。一個(gè)接收器需要一個(gè)executor筹误,占用一個(gè)內(nèi)核。為了確保在接收器占用了內(nèi)核后癣缅,還有足夠的內(nèi)核來(lái)進(jìn)行處理工作厨剪,你必須要再內(nèi)核分配的時(shí)候同時(shí)考慮到這兩部分。接收器以輪詢的方式來(lái)分配執(zhí)行器友存。
- 從流數(shù)據(jù)源接收到數(shù)據(jù)后祷膳,接收器會(huì)創(chuàng)建一個(gè)數(shù)據(jù)塊。每個(gè)block interval都會(huì)生成一個(gè)數(shù)據(jù)塊屡立。N個(gè)數(shù)據(jù)塊會(huì)被生成在一個(gè)batch interval內(nèi)(N=batchInterval/blockInterval)直晨。這些塊被塊管理器分發(fā)到其他執(zhí)行器的塊管理器上。在這之后膨俐,運(yùn)行在driver上的網(wǎng)絡(luò)接受追蹤器會(huì)被通知這些塊所在的位置勇皇,以便進(jìn)行下一個(gè)的數(shù)據(jù)處理。
- 在batch interval期間焚刺,driver會(huì)對(duì)這些塊創(chuàng)建一個(gè)RDD敛摘。在這個(gè)batch interval內(nèi)生成的快都是這個(gè)RDD的分區(qū)。每一個(gè)分區(qū)在Spark上都是一個(gè)任務(wù)乳愉。如果blockInterval== batchinterval兄淫,則意味著只有一個(gè)分區(qū),并且可能就直接在本地處理了蔓姚。
- 這些塊上的map任務(wù)都運(yùn)行在執(zhí)行器單元上(一個(gè)在接收數(shù)據(jù)塊的位置捕虽,另一個(gè)在數(shù)據(jù)塊被備份到的位置)。這可以讓block并不必關(guān)心block interval坡脐,除非是非本地調(diào)度泄私。較大的block interval會(huì)帶了更大的block,參數(shù)spark.locality.wait,能夠讓塊更可能在本地處理挖滤。你需要在這兩個(gè)參數(shù)間找到一種平衡崩溪,來(lái)能夠保證bigger block能夠在本地執(zhí)行。
-除了可以使用batch interval和block interval之外斩松,你還可以通過(guò)inputDstream.repartition(n)來(lái)定義分區(qū)數(shù)伶唯。這會(huì)對(duì)RDD中的數(shù)據(jù)進(jìn)行隨機(jī)重組,生成n個(gè)數(shù)據(jù)分區(qū)惧盹。為了更合理的分區(qū)數(shù)乳幸,你必須付出一個(gè)數(shù)據(jù)重組的代價(jià)。RDD的處理钧椰,都是作為一個(gè)job通過(guò)driver的jobscheduler來(lái)進(jìn)行調(diào)度的粹断。在一個(gè)給定的時(shí)間點(diǎn)只有一個(gè)job是活動(dòng)的。所以嫡霞,如果一個(gè)job是執(zhí)行中的瓶埋,那么其他job就是排隊(duì)中。 - 如果你有兩個(gè)DStream诊沪,那么就會(huì)形成兩個(gè)RDD养筒,也就會(huì)生成兩個(gè)Job,然后被一個(gè)接一個(gè)的調(diào)度端姚。為了避免這種情況晕粪,你可以對(duì)峙兩個(gè)DStream執(zhí)行union操作。這保證了兩個(gè)DStream RDD會(huì)產(chǎn)生一個(gè)unionRDD渐裸,這個(gè)unionRDD會(huì)當(dāng)做一個(gè)單獨(dú)的job巫湘。然而這對(duì)RDDs中的分區(qū)并沒(méi)有任何影響。
- 如果批次處理時(shí)間遠(yuǎn)大于批次間隔昏鹃,那么接收器的內(nèi)存會(huì)被塞滿尚氛,并且最終會(huì)拋出異常(最可能是BlockNotFoundException)。目前盆顾,我們還沒(méi)有方式來(lái)停止這個(gè)接收器怠褐,只能通過(guò)Spark配置spark.streaming.receiver.maxRate來(lái)限制接收器的數(shù)據(jù)接受率畏梆。