目錄
1.系統(tǒng)架構(gòu)
2.環(huán)境搭建
2.1本地環(huán)境下kafka批量導(dǎo)入數(shù)據(jù)
2.2 kafka-manager的安裝與配置
3.1 Spark Streaming 性能調(diào)優(yōu)(一): 解決并行度
3.2 Spark Streaming 性能調(diào)優(yōu)(二): 解決task傾斜
前一篇文章解決了task數(shù)據(jù)傾斜的問(wèn)題, 但是將代碼提交放到集群環(huán)境上測(cè)試的時(shí)候卻發(fā)現(xiàn), 性能并沒(méi)有多大改善, 但是通過(guò)Spark ui可以看到, 各個(gè)task分配到的數(shù)據(jù)量確實(shí)是比較平均的, 而奇怪的就在于, 這些數(shù)據(jù)量平均的task卻都集中在某一兩臺(tái)機(jī)器上運(yùn)行了.
task傾斜
上面我遇到的這種情況其實(shí)就task傾斜, 各個(gè)task的數(shù)據(jù)量很平均, 但都集中在某一兩機(jī)器上, 導(dǎo)致這一兩臺(tái)機(jī)器性能被榨干, 其他機(jī)器在圍觀.
之后經(jīng)過(guò)了幾天理解Spark的task分配機(jī)制, 最后終于發(fā)現(xiàn), 我目前遇到的task傾斜都是由于Spark的數(shù)據(jù)本地化特性導(dǎo)致的.(/(ㄒoㄒ)/~~)
數(shù)據(jù)本地化
數(shù)據(jù)本地化有3個(gè)級(jí)別:
- PROCESS_LOCAL: 該級(jí)別由參數(shù)spark.locality.wait.process指定
- NODE_LOCAL: 該級(jí)別由參數(shù)spark.locality.wait.node指定
- RACK_LOCAL: 該級(jí)別由參數(shù)spark.locality.wait.rack指定
上面三個(gè)參數(shù), 如果不指定值的話, 則默認(rèn)取值于參數(shù)spark.locality.wait的值, 該值默認(rèn)為3s
Spark在task的調(diào)度過(guò)程中, 會(huì)優(yōu)化把task分配給持有數(shù)據(jù)/緩存的同一個(gè)executor進(jìn)程, 如果executor進(jìn)程正在執(zhí)行別的任務(wù), 則會(huì)等待spark.locality.wait.process秒, 如果等待了這么多秒之后該executor還是不可用, 則數(shù)據(jù)本地化程度降低一級(jí), 選擇分配到統(tǒng)一節(jié)點(diǎn)的其他executor進(jìn)程, 如果還是不可用, 則選擇同一個(gè)集群的其他executor.
加入通過(guò)spark ui的executor頁(yè)發(fā)現(xiàn)某個(gè)executor的數(shù)據(jù)input量特別大, 則極有可能會(huì)發(fā)生task傾斜.
解決辦法
解決辦法其實(shí)就是適當(dāng)降低數(shù)據(jù)本地化參數(shù)的值, 具體的值是多少需要視實(shí)際情況.
由于我公司的集群可以開(kāi)16個(gè)executor, 而發(fā)生task傾斜的時(shí)候每個(gè)task運(yùn)行時(shí)間大概也就2~3s, 所以我設(shè)置了spark.locality.wait.process=200ms, spark.locality.wait.node=200ms, 其他用默認(rèn)值.
修改配置參數(shù)后重新提交spark程序, 確實(shí)集群的資源終于能利用起來(lái)了, 原本要1分鐘的窗口間隔, 要跑1min~1.5min, 現(xiàn)在只需30~40s就跑完了, 終于不會(huì)再出現(xiàn)延時(shí)嚴(yán)重的情況了!!!
((≧▽≦)/啦啦啦)