性能優(yōu)化
為了獲得運(yùn)行在集群上的Spark應(yīng)用的最好的性能需要一些調(diào)優(yōu)。這一部分介紹了一些能夠改善應(yīng)用性能的一些參數(shù)和配置。從更高的角度看,你需要考慮兩件事情:
1、高效的使用集群的資源從而降低每個(gè)batch的數(shù)據(jù)處理的時(shí)間
2湾宙、設(shè)置合理的batch的大小從而數(shù)據(jù)使得數(shù)據(jù)處理的速度和接收速度一樣。
減少每個(gè)Batch的處理時(shí)間
為了減少每個(gè)batch的處理時(shí)間 是有許多可以優(yōu)化操作冈绊,稍后 Tuning Guide
中有討論侠鳄。這一節(jié)著重講述了比較重要的一些優(yōu)化。
數(shù)據(jù)接收的并行度
通過(guò)網(wǎng)絡(luò)接收到的數(shù)據(jù)需要數(shù)據(jù)在spark內(nèi)進(jìn)行反序列化和存儲(chǔ)死宣。如果數(shù)據(jù)的接收變成了系統(tǒng)的瓶頸伟恶,就需要考慮并行處理數(shù)據(jù)的接收。需要注意的是毅该,每個(gè)輸入流在worker的節(jié)點(diǎn)上創(chuàng)建了單獨(dú)的receiver博秫,它只會(huì)接收一個(gè)流的數(shù)據(jù)。接收多個(gè)數(shù)據(jù)流的花需要通過(guò)創(chuàng)建多個(gè)數(shù)據(jù)流并且配置它們使得能夠接收輸入流的不同分區(qū)的數(shù)據(jù)眶掌。舉個(gè)例子挡育,一個(gè)kafka的輸入DStream 接收兩個(gè)topic的數(shù)據(jù)可以其分成兩個(gè)stream,每個(gè)接收一個(gè)topic朴爬。這樣就會(huì)運(yùn)行兩個(gè)receiver即寒,這樣就會(huì)并行接收數(shù)據(jù),因而提高了整體的吞吐量。多個(gè)DStream 可以u(píng)nion成一個(gè)DStream,然后之前應(yīng)用于一個(gè)DStream的transformation 操作就可以應(yīng)用于被unioned的DStream上了蒿叠,操作入下:
val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()
另外一個(gè)需要考慮的就是 receiver的 block interval(區(qū)塊時(shí)長(zhǎng)暫時(shí)翻譯),這個(gè)由參數(shù)configuration parameter spark.streaming.blockInterval 決定
接收多個(gè)數(shù)據(jù)流數(shù)據(jù)另外一個(gè)選擇是使用inputStream.repartition(<number of partitions>) 明確的切分輸入數(shù)據(jù)流。它會(huì)在數(shù)據(jù)處理之前蚣常,把接收到的多個(gè)batch的數(shù)據(jù)分發(fā)到集群中的指定數(shù)量的機(jī)器上
數(shù)據(jù)處理的并行度
如果并行計(jì)算的task的數(shù)量不夠大的話市咽,集群的資源利用率是不高的。舉個(gè)例子抵蚊,分布式的reduce操作施绎,比如reduceByKey和reduceByKeyAndWindow,這個(gè)默認(rèn)的并行度是參數(shù) spark.default.parallelism 控制的贞绳。你可以作為參數(shù)傳入并行度或者配置這個(gè)參數(shù)來(lái)修改默認(rèn)值谷醉。
數(shù)據(jù)的序列化
數(shù)據(jù)序列化的壓力可以通過(guò)優(yōu)化序列化方式的方法解決降低。針對(duì)于streaming的情況冈闭,這里有兩種類(lèi)型的數(shù)據(jù)可以被序列化俱尼。
- InputData:默認(rèn)情況下,Receiver接收到的數(shù)據(jù)是存儲(chǔ)在executors的內(nèi)存內(nèi)的萎攒,存儲(chǔ)的級(jí)別是StorageLevel.MEMORY_AND_DISK_SER_2.也就是說(shuō)遇八,數(shù)據(jù)是被序列化成字節(jié)以減少剛才的壓力。并且會(huì)被復(fù)制以防executor失敗耍休。而且數(shù)據(jù)會(huì)優(yōu)先保存在內(nèi)存刃永,直到需要計(jì)算的數(shù)據(jù)在內(nèi)存已經(jīng)保存不下的時(shí)候才會(huì)寫(xiě)入到磁盤(pán)。這些序列化明顯過(guò)度耗費(fèi)資源羊精,reciiver必須反序列化接收到的數(shù)據(jù)并且重新序列化成spark序列化的方式斯够。
- Streaming操作產(chǎn)生的持久化的RDD streaming計(jì)算產(chǎn)生的RDD會(huì)被持久化在內(nèi)存中,舉個(gè)例子喧锦,窗口操作會(huì)在內(nèi)存中持久化這些數(shù)據(jù)以防數(shù)據(jù)需要多次被處理读规。當(dāng)時(shí)不同于SparkCore默認(rèn)的StorageLevel.MEMORY_ONLY,持久化的RDD是默認(rèn)是按照StorageLevel.MEMORY_ONLY_SER進(jìn)行持久化的。
以上兩種情形燃少,使用Kryo序列化會(huì)降低CPU和內(nèi)存的過(guò)度使用掖桦。
在某些特殊的情況下,比如spark不需要保持大量的數(shù)據(jù)供汛,持久化數(shù)據(jù)使用反序列化后的對(duì)象不會(huì)導(dǎo)致過(guò)度的gc壓力枪汪,所以也是一種可行的辦法。舉個(gè)例子怔昨,如果你在使用一個(gè)幾秒的batch并且沒(méi)有window的操作雀久,你可以顯示的設(shè)置storage的級(jí)別從而關(guān)閉序列化。這將會(huì)減少cpu因?yàn)樾蛄谢鴮?dǎo)致的壓力趁舀,從而提升性能赖捌。
啟動(dòng)的Task過(guò)多
如果每秒鐘啟動(dòng)的task的數(shù)量非常高(比如,每秒50或者更多),那么分發(fā)任務(wù)到slave上的壓力將會(huì)非常大越庇,并且將會(huì)使得要想獲得ms級(jí)別的延遲變得很難罩锐。這種壓力可以通過(guò)如下的改變降低:
執(zhí)行模式:執(zhí)行Spark使用standalone 模式或者粗粒度的 Mesos模式task的啟動(dòng)時(shí)間會(huì)優(yōu)于使用細(xì)粒度的Mesos的模式 可以參考 Running on Mesos guide
這個(gè)改變可以減少每個(gè)batch的時(shí)間到幾百ms,從而是的亞秒級(jí)的batchsize變得可行卤唉。
設(shè)置正確的batch間隔
為了保證運(yùn)行在集群上的spark應(yīng)用穩(wěn)定涩惑,必須保證數(shù)據(jù)處理的速度要達(dá)到數(shù)據(jù)接收的速度。換句話說(shuō)桑驱,每個(gè)batch處理數(shù)據(jù)的速度必須和產(chǎn)生的速度一致竭恬。是否能一致可以通過(guò)monitoring的web ui 上的處理時(shí)間看到。正常情況下熬的,處理時(shí)間要小于間隔時(shí)間痊硕。
取決于streaming計(jì)算的天然特征,對(duì)于固定資源的集群押框,batch的間隔對(duì)于數(shù)據(jù)在應(yīng)用中的保持率有重大影響岔绸。舉個(gè)例子,比如 WordCountNetwork橡伞,針對(duì)于特定的數(shù)據(jù)速率亭螟,系統(tǒng)可以支持每隔2s的單詞統(tǒng)計(jì),但是卻不能支持500ms的骑歹。因此预烙,batch的間隔時(shí)間需要設(shè)置成實(shí)際生產(chǎn)中需要保持的的期望的數(shù)據(jù)速率。
一個(gè)好方法就是 計(jì)算一個(gè)合適的batch的大小 去測(cè)試一 保守的batch間隔和一個(gè)比較低的數(shù)據(jù)速率道媚。為了驗(yàn)證系統(tǒng)是否能跟上數(shù)據(jù)的速率扁掸,你可以查看每個(gè)處理過(guò)的batch的端到端的處理延遲。如果delay的時(shí)間和batch的大小差不很多最域,那么系統(tǒng)就是穩(wěn)定谴分。否則的花,如果delay持續(xù)增加镀脂,意味著系統(tǒng)跟不上數(shù)據(jù)的速率從而變得不穩(wěn)定牺蹄。一旦你有兩個(gè)一個(gè)固定的配置,你就可以嘗試增加數(shù)據(jù)的速率或者減少batch的大小薄翅。需要注意的是沙兰,由于緩存數(shù)據(jù)增加導(dǎo)致的內(nèi)存增加是ok的,只要延遲時(shí)間降到一個(gè)很低的值翘魄。