秒級(jí)風(fēng)控spark優(yōu)化
背景:在aws和qq同時(shí)存在時(shí)辫愉,兩邊分開計(jì)算,數(shù)據(jù)量不大将硝,任務(wù)不會(huì)出現(xiàn)延遲恭朗,全遷移到qq之后,所以數(shù)據(jù)全在一個(gè)集群中處理依疼,延時(shí)非常嚴(yán)重痰腮,沒(méi)辦法做到實(shí)時(shí)風(fēng)控?cái)r截
調(diào)優(yōu)后配置如下:
1.控制sparkstreaming 消費(fèi)能力,防止任務(wù)計(jì)算不完發(fā)生堆積或內(nèi)存溢出
機(jī)制1:
背壓機(jī)制律罢,sparkstreaming 支持根據(jù)響應(yīng)時(shí)間動(dòng)態(tài)控制接入消息數(shù)膀值,配置如下
spark.streaming.backpressure.enabled=true(開啟背壓)
spark.streaming.backpressure.pid.minRate=400(最少接入消息數(shù))
機(jī)制2:
spark.streaming.kafka.maxRatePerPartition(控制batch類最多消費(fèi)多少條消息,分區(qū)個(gè)數(shù)*spark.streaming.kafka.maxRatePerPartition就是接入的總消息數(shù))
背壓效果如下:
背壓機(jī)制較機(jī)制2更靈活,且能很好結(jié)合資源動(dòng)態(tài)伸縮
2.廣播機(jī)制
敏感接口列表沧踏,用廣播機(jī)制替代實(shí)時(shí)查詢r(jià)edis(這里要注意廣播出去的變量都必須是支持序列化的歌逢,redis類庫(kù)返回的set是不支持序列化的,運(yùn)行過(guò)程會(huì)報(bào)空指針)
3.高性能算子和緩存機(jī)制
<pre style="background:white">用mapPartitionsToPair替代mapToPair,前者直接作用于一個(gè)Partition來(lái)計(jì)算悦冀,后者基于Partition中的每個(gè)元素一個(gè)個(gè)運(yùn)算</pre>
將經(jīng)常用maprdd緩存起來(lái)趋翻,避免job重復(fù)計(jì)算
4.資源動(dòng)態(tài)伸縮
spark.dynamicAllocation.enabled=true \
spark.shuffle.service.enabled=true \
spark.dynamicAllocation.executorIdleTimeout=60 \
spark.dynamicAllocation.cachedExecutorIdleTimeout=60 \
spark.dynamicAllocation.initialExecutors=2 \
spark.dynamicAllocation.maxExecutors=7 \
spark.dynamicAllocation.minExecutors=2 \
yarn-site文件配置shuffle服務(wù),添加如下配置
jar包拷貝到/usr/local/service/hadoop/share/hadoop/yarn下盒蟆,保障nm能找到對(duì)應(yīng)的類踏烙。
PS:實(shí)測(cè)過(guò)程,伸縮回來(lái)之后历等,executor無(wú)法回收回去讨惩,即使計(jì)算時(shí)間很快
Executor回收機(jī)制:
只要有一個(gè)task結(jié)束,就會(huì)判定有哪些Executor已經(jīng)沒(méi)有任務(wù)了寒屯。然后會(huì)被加入待移除列表荐捻。在放到removeTimes的時(shí)候,會(huì)把當(dāng)前時(shí)間now + executorIdleTimeoutS * 1000 作為時(shí)間戳存儲(chǔ)起來(lái)寡夹。當(dāng)調(diào)度進(jìn)程掃描這個(gè)到Executor時(shí)处面,會(huì)判定時(shí)間是不是到了,到了的話就執(zhí)行實(shí)際的remove動(dòng)作菩掏。在這個(gè)期間魂角,一旦有task再啟動(dòng),并且正好運(yùn)行在這個(gè)Executor上智绸,則又會(huì)從removeTimes列表中被移除野揪。 那么這個(gè)Executor就不會(huì)被真實(shí)的刪除了爪瓜。因?yàn)閷?shí)際運(yùn)行過(guò)程中涣觉,有12個(gè)kafka 分區(qū),會(huì)導(dǎo)致一直有task在Executor中運(yùn)行庄萎,無(wú)法觸發(fā)Executor刪除操作迹恐,源碼可見org.apache.spark. ExecutorAllocationManager
5.分離regionserver和 nm進(jìn)程挣惰,regionserver進(jìn)程在compact時(shí),會(huì)很耗資源殴边,導(dǎo)致跑在這些機(jī)器上任務(wù)執(zhí)行時(shí)間很長(zhǎng)
6.將寫hbase操作通熄,做成異步寫,并可以動(dòng)態(tài)關(guān)閉寫hbase 接口明細(xì)操作
<pre style="background:white">7.合理設(shè)置shuffle的數(shù)量找都,例如groupByKey操作唇辨,通過(guò)spark.default.parallelism參數(shù)控制,默認(rèn)是2(這樣可以有效控制foreachRdd操作時(shí)每個(gè)任務(wù)的耗時(shí)時(shí)間)實(shí)際操作過(guò)程中能耻,最好是和excutor core個(gè)數(shù)相等赏枚,配置成6會(huì)產(chǎn)生6個(gè)task</pre>
<pre style="background:white">8.sparkstreaming 任務(wù)延時(shí)監(jiān)控,在任務(wù)延遲時(shí)亡驰,能第一時(shí)間知道</pre>
<pre style="background:
white">通過(guò)注冊(cè)MaliciousDetectStreamingListener到JavaStreamingContext中,來(lái)監(jiān)控任務(wù)延時(shí)情況饿幅,并告警 @See http://www.reibang.com/p/5506cd264f4d</pre>