因?yàn)槭状螁?dòng)JOB的時(shí)候,由于冷啟動(dòng)會(huì)造成內(nèi)存使用太大士复,為了防止這種情況出現(xiàn)炭懊,限制首次處理的數(shù)據(jù)量
spark.streaming.backpressure.enabled=true
spark.streaming.backpressure.initialRate=200
for example:
#!/bin/sh
TaskName="funnel"
UserName="hadoop"
cd `dirname $0`
nohup sudo -u ${UserName} /data/bigdata/spark/bin/spark-submit \
--name ${TaskName} \
--class FunnelMain \
--master yarn \
--deploy-mode cluster \
--executor-memory 2G \
--num-executors 3 \
--conf spark.streaming.backpressure.enabled=true \
--conf spark.streaming.backpressure.initialRate=1000 \
--files /data/apps/funnel/app/conf/conf.properties \
/data/apps/funnel/app/target/apphadoop-1-jar-with-dependencies.jar conf.properties >>../log/${TaskName}.log 2>&1 &
exit 0
使用SparkStreaming集成kafka時(shí)有幾個(gè)比較重要的參數(shù):
spark.streaming.stopGracefullyOnShutdown (true / false)默認(rèn)fasle
確保在kill任務(wù)時(shí),能夠處理完最后一批數(shù)據(jù)庵朝,再關(guān)閉程序吗冤,不會(huì)發(fā)生強(qiáng)制kill導(dǎo)致數(shù)據(jù)處理中斷,沒(méi)處理完的數(shù)據(jù)丟失spark.streaming.backpressure.enabled (true / false) 默認(rèn)false
開(kāi)啟后spark自動(dòng)根據(jù)系統(tǒng)負(fù)載選擇最優(yōu)消費(fèi)速率spark.streaming.backpressure.initialRate (整數(shù))
默認(rèn)直接讀取所有 在(2)開(kāi)啟的情況下九府,限制第一次批處理應(yīng)該消費(fèi)的數(shù)據(jù)椎瘟,因?yàn)槌绦蚶鋯?dòng)隊(duì)列里面有大量積壓,防止第一次全部讀取侄旬,造成系統(tǒng)阻塞spark.streaming.kafka.maxRatePerPartition (整數(shù))
默認(rèn)直接讀取所有限制每秒每個(gè)消費(fèi)線程讀取每個(gè)kafka分區(qū)最大的數(shù)據(jù)量
注意:
只有(4)激活的時(shí)候肺蔚,每次消費(fèi)的最大數(shù)據(jù)量,就是設(shè)置的數(shù)據(jù)量儡羔,如果不足這個(gè)數(shù)婆排,就有多少讀多少,如果超過(guò)這個(gè)數(shù)字笔链,就讀取這個(gè)數(shù)字的設(shè)置的值
只有(2)+(4)激活的時(shí)候段只,每次消費(fèi)讀取的數(shù)量最大會(huì)等于(4)設(shè)置的值,最小是spark根據(jù)系統(tǒng)負(fù)載自動(dòng)推斷的值鉴扫,消費(fèi)的數(shù)據(jù)量會(huì)在這兩個(gè)范圍之內(nèi)變化根據(jù)系統(tǒng)情況赞枕,但第一次啟動(dòng)會(huì)有多少讀多少數(shù)據(jù)。此后按(2)+(4)設(shè)置規(guī)則運(yùn)行
(2)+(3)+(4)同時(shí)激活的時(shí)候,跟上一個(gè)消費(fèi)情況基本一樣炕婶,但第一次消費(fèi)會(huì)得到限制姐赡,因?yàn)槲覀冊(cè)O(shè)置第一次消費(fèi)的頻率了。