Spark是粗粒度的,即在默認(rèn)情況下會(huì)預(yù)先分配好資源霜浴,再進(jìn)行計(jì)算。
好處是資源提前分配好蓝纲,有計(jì)算任務(wù)時(shí)就直接使用計(jì)算資源坷随,不用再考慮資源分配房铭。
不好的地方是,高峰值和低峰值時(shí)需要的資源是不一樣的温眉。資源如果是針對(duì)高峰值情況下考慮的缸匪,那勢(shì)必在低峰值情況下會(huì)有大量的資源浪費(fèi)。
Twitter最近推出了會(huì)秒殺Storm的Heron类溢,非常值得關(guān)注凌蔬。因?yàn)镠eron能有更好的資源分配、?更低的延遲闯冷。Heron在語義上兼容了Storm砂心,即原來在Storm上開發(fā)的應(yīng)用程序可以馬上在Heron上使用。Storm絕對(duì)要成為歷史了蛇耀。Heron的主要開發(fā)語言是C++辩诞、Java、Python纺涤。其API支持Java译暂。
SparkCore的入口SparkContext:
//?Optionally?scale?number?of?executors?dynamically?based?on?workload.?Exposed?for?testing.
val?dynamicAllocationEnabled?=?Utils.isDynamicAllocationEnabled(_conf)
if?(!dynamicAllocationEnabled?&&?_conf.getBoolean("spark.dynamicAllocation.enabled",?false))?{
logWarning("Dynamic?Allocation?and?num?executors?both?set,?thus?dynamic?allocation?disabled.")
}
_executorAllocationManager?=
if?(dynamicAllocationEnabled)?{
Some(new?ExecutorAllocationManager(this,?listenerBus,?_conf))
}?else?{
None
}
已經(jīng)支持資源的動(dòng)態(tài)分配。
Utils.isDynamicAllocationEnabled:
/**
*?Return?whether?dynamic?allocation?is?enabled?in?the?given?conf
*?Dynamic?allocation?and?explicitly?setting?the?number?of?executors?are?inherently
*?incompatible.?In?environments?where?dynamic?allocation?is?turned?on?by?default,
*?the?latter?should?override?the?former?(SPARK-9092).
*/
defisDynamicAllocationEnabled(conf:?SparkConf):?Boolean?=?{
conf.getBoolean("spark.dynamicAllocation.enabled",?false)?&&
conf.getInt("spark.executor.instances",?0)?==?0
}
ExecutorAllocationManager:
...
//?Clock?used?to?schedule?when?executors?should?be?added?and?removed
private?var?clock:?Clock?=?newSystemClock()
...
有個(gè)時(shí)鐘撩炊,基于時(shí)鐘的定時(shí)器會(huì)不斷的掃描Executor的情況外永,每過一段時(shí)間去看看資源情況。
Master.schedule:
/**
*?Schedule?the?currently?available?resources?among?waiting?apps.?This?method?will?be?called
*?every?time?a?new?app?joins?or?resource?availability?changes.
*/
private?defschedule():?Unit?=?{
if?(state?!=?RecoveryState.ALIVE)?{?return?}
//?Drivers?take?strict?precedence?over?executors
val?shuffledWorkers?=?Random.shuffle(workers)?//?Randomization?helps?balance?drivers
for?(worker?<-?shuffledWorkers?if?worker.state?==?WorkerState.ALIVE)?{
for?(driver?<-?waitingDrivers)?{
if?(worker.memoryFree?>=?driver.desc.mem?&&?worker.coresFree?>=?driver.desc.cores)?{
launchDriver(worker,?driver)
waitingDrivers?-=?driver
}
}
}
startExecutorsOnWorkers()
}
原先默認(rèn)的用于分配資源拧咳。
ExecutorAllocaionManager:
//?Polling?loop?interval?(ms)
private?val?intervalMillis:?Long?=?100
...
//?A?timestamp?for?each?executor?of?when?the?executor?should?be?removed,?indexed?by?the?ID
//?This?is?set?when?an?executor?is?no?longer?running?a?task,?or?when?it?first?registers
private?valremoveTimes=?new?mutable.HashMap[String,?Long]
...
//?Clock?used?to?schedule?when?executors?should?be?added?and?removed
private?var?clock:?Clock?=?new?SystemClock()
...
//?Executor?that?handles?the?scheduling?task.
private?val?executor?=
ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-dynamic-executor-allocation")
...
removeTimes中有Executor的ID伯顶。
executor中有定時(shí)器,不斷執(zhí)行schedule骆膝。默認(rèn)周期是intervalMillis(100ms)
ExecutorAllocaionManager.start:
/**
*?Register?for?scheduler?callbacks?to?decide?when?to?add?and?remove?executors,?and?start
*?the?scheduling?task.
*/
defstart():?Unit?=?{
listenerBus.addListener(listener)
val?scheduleTask?=?newRunnable()?{
override?def?run():?Unit?=?{
try?{
schedule()
}?catch?{
case?ct:?ControlThrowable?=>
throw?ct
case?t:?Throwable?=>
logWarning(s"Uncaught?exception?in?thread?${Thread.currentThread().getName}",?t)
}
}
}
executor.scheduleAtFixedRate(scheduleTask,?0,intervalMillis,?TimeUnit.MILLISECONDS)
}
ExecutorAllocaionManager.schedule:
/**
*?This?is?called?at?a?fixed?interval?to?regulate?the?number?of?pending?executor?requests
*?and?number?of?executors?running.
*
*?First,?adjust?our?requested?executors?based?on?the?add?time?and?our?current?needs.
*?Then,?if?the?remove?time?for?an?existing?executor?has?expired,?kill?the?executor.
*
*?This?is?factored?out?into?its?own?method?for?testing.
*/
private?defschedule():?Unit?=?synchronized?{
val?now?=?clock.getTimeMillis
updateAndSyncNumExecutorsTarget(now)
removeTimes.retain?{?case?(executorId,?expireTime)?=>
val?expired?=?now?>=?expireTime
if?(expired)?{
initializing?=?false
removeExecutor(executorId)
}
!expired
}
}
這個(gè)內(nèi)部方法會(huì)被周期性的觸發(fā)執(zhí)行祭衩。
實(shí)際生產(chǎn)環(huán)境下,動(dòng)態(tài)資源分配可能要自己做好定制阅签。
SparkStreaming的動(dòng)態(tài)調(diào)整的復(fù)雜之處在于掐暮,即使在batch duration內(nèi)剛做了調(diào)整,但可能本batch duration馬上就會(huì)過期愉择。
你可以考慮改變執(zhí)行周期(intervalMills)劫乱,來動(dòng)態(tài)調(diào)整织中。在一個(gè)batchduration中要對(duì)數(shù)據(jù)分片锥涕,可以算一下已擁有閑置的core,如果不夠狭吼,則可以申請(qǐng)?jiān)黾覧xecutor层坠,從而把任務(wù)分配到新增的Executor。
也可以考量針對(duì)上一個(gè)batchduration的資源需求情況刁笙,因?yàn)榉逯党霈F(xiàn)時(shí)破花,往往會(huì)延續(xù)在多個(gè)連續(xù)的batch duration中谦趣。考量上一個(gè)batch duration的情況座每,用某種算法來動(dòng)態(tài)調(diào)整后續(xù)的batch duration的資源前鹅。修改Spark Streaming可以設(shè)計(jì)StreamingContext的新子類。
其實(shí)前面的動(dòng)態(tài)資源分配的定制方式做起來不容易峭梳,可能仍不太合適舰绘。
備注:
資料來源于:DT_大數(shù)據(jù)夢(mèng)工廠(Spark發(fā)行版本定制)
更多私密內(nèi)容,請(qǐng)關(guān)注微信公眾號(hào):DT_Spark
如果您對(duì)大數(shù)據(jù)Spark感興趣葱椭,可以免費(fèi)聽由王家林老師每天晚上20:00開設(shè)的Spark永久免費(fèi)公開課捂寿,地址YY房間號(hào):68917580