Spark Streaming資源動(dòng)態(tài)申請(qǐng)和動(dòng)態(tài)控制消費(fèi)速率原理剖析

Spark是粗粒度的,即在默認(rèn)情況下會(huì)預(yù)先分配好資源霜浴,再進(jìn)行計(jì)算。

好處是資源提前分配好蓝纲,有計(jì)算任務(wù)時(shí)就直接使用計(jì)算資源坷随,不用再考慮資源分配房铭。

不好的地方是,高峰值和低峰值時(shí)需要的資源是不一樣的温眉。資源如果是針對(duì)高峰值情況下考慮的缸匪,那勢(shì)必在低峰值情況下會(huì)有大量的資源浪費(fèi)。

Twitter最近推出了會(huì)秒殺Storm的Heron类溢,非常值得關(guān)注凌蔬。因?yàn)镠eron能有更好的資源分配、?更低的延遲闯冷。Heron在語義上兼容了Storm砂心,即原來在Storm上開發(fā)的應(yīng)用程序可以馬上在Heron上使用。Storm絕對(duì)要成為歷史了蛇耀。Heron的主要開發(fā)語言是C++辩诞、Java、Python纺涤。其API支持Java译暂。

SparkCore的入口SparkContext:

//?Optionally?scale?number?of?executors?dynamically?based?on?workload.?Exposed?for?testing.

val?dynamicAllocationEnabled?=?Utils.isDynamicAllocationEnabled(_conf)

if?(!dynamicAllocationEnabled?&&?_conf.getBoolean("spark.dynamicAllocation.enabled",?false))?{

logWarning("Dynamic?Allocation?and?num?executors?both?set,?thus?dynamic?allocation?disabled.")

}

_executorAllocationManager?=

if?(dynamicAllocationEnabled)?{

Some(new?ExecutorAllocationManager(this,?listenerBus,?_conf))

}?else?{

None

}

已經(jīng)支持資源的動(dòng)態(tài)分配。

Utils.isDynamicAllocationEnabled:

/**

*?Return?whether?dynamic?allocation?is?enabled?in?the?given?conf

*?Dynamic?allocation?and?explicitly?setting?the?number?of?executors?are?inherently

*?incompatible.?In?environments?where?dynamic?allocation?is?turned?on?by?default,

*?the?latter?should?override?the?former?(SPARK-9092).

*/

defisDynamicAllocationEnabled(conf:?SparkConf):?Boolean?=?{

conf.getBoolean("spark.dynamicAllocation.enabled",?false)?&&

conf.getInt("spark.executor.instances",?0)?==?0

}

ExecutorAllocationManager:

...

//?Clock?used?to?schedule?when?executors?should?be?added?and?removed

private?var?clock:?Clock?=?newSystemClock()

...

有個(gè)時(shí)鐘撩炊,基于時(shí)鐘的定時(shí)器會(huì)不斷的掃描Executor的情況外永,每過一段時(shí)間去看看資源情況。

Master.schedule:

/**

*?Schedule?the?currently?available?resources?among?waiting?apps.?This?method?will?be?called

*?every?time?a?new?app?joins?or?resource?availability?changes.

*/

private?defschedule():?Unit?=?{

if?(state?!=?RecoveryState.ALIVE)?{?return?}

//?Drivers?take?strict?precedence?over?executors

val?shuffledWorkers?=?Random.shuffle(workers)?//?Randomization?helps?balance?drivers

for?(worker?<-?shuffledWorkers?if?worker.state?==?WorkerState.ALIVE)?{

for?(driver?<-?waitingDrivers)?{

if?(worker.memoryFree?>=?driver.desc.mem?&&?worker.coresFree?>=?driver.desc.cores)?{

launchDriver(worker,?driver)

waitingDrivers?-=?driver

}

}

}

startExecutorsOnWorkers()

}

原先默認(rèn)的用于分配資源拧咳。

ExecutorAllocaionManager:

//?Polling?loop?interval?(ms)

private?val?intervalMillis:?Long?=?100

...

//?A?timestamp?for?each?executor?of?when?the?executor?should?be?removed,?indexed?by?the?ID

//?This?is?set?when?an?executor?is?no?longer?running?a?task,?or?when?it?first?registers

private?valremoveTimes=?new?mutable.HashMap[String,?Long]

...

//?Clock?used?to?schedule?when?executors?should?be?added?and?removed

private?var?clock:?Clock?=?new?SystemClock()

...

//?Executor?that?handles?the?scheduling?task.

private?val?executor?=

ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-dynamic-executor-allocation")

...

removeTimes中有Executor的ID伯顶。

executor中有定時(shí)器,不斷執(zhí)行schedule骆膝。默認(rèn)周期是intervalMillis(100ms)

ExecutorAllocaionManager.start:

/**

*?Register?for?scheduler?callbacks?to?decide?when?to?add?and?remove?executors,?and?start

*?the?scheduling?task.

*/

defstart():?Unit?=?{

listenerBus.addListener(listener)

val?scheduleTask?=?newRunnable()?{

override?def?run():?Unit?=?{

try?{

schedule()

}?catch?{

case?ct:?ControlThrowable?=>

throw?ct

case?t:?Throwable?=>

logWarning(s"Uncaught?exception?in?thread?${Thread.currentThread().getName}",?t)

}

}

}

executor.scheduleAtFixedRate(scheduleTask,?0,intervalMillis,?TimeUnit.MILLISECONDS)

}

ExecutorAllocaionManager.schedule:

/**

*?This?is?called?at?a?fixed?interval?to?regulate?the?number?of?pending?executor?requests

*?and?number?of?executors?running.

*

*?First,?adjust?our?requested?executors?based?on?the?add?time?and?our?current?needs.

*?Then,?if?the?remove?time?for?an?existing?executor?has?expired,?kill?the?executor.

*

*?This?is?factored?out?into?its?own?method?for?testing.

*/

private?defschedule():?Unit?=?synchronized?{

val?now?=?clock.getTimeMillis

updateAndSyncNumExecutorsTarget(now)

removeTimes.retain?{?case?(executorId,?expireTime)?=>

val?expired?=?now?>=?expireTime

if?(expired)?{

initializing?=?false

removeExecutor(executorId)

}

!expired

}

}

這個(gè)內(nèi)部方法會(huì)被周期性的觸發(fā)執(zhí)行祭衩。

實(shí)際生產(chǎn)環(huán)境下,動(dòng)態(tài)資源分配可能要自己做好定制阅签。

SparkStreaming的動(dòng)態(tài)調(diào)整的復(fù)雜之處在于掐暮,即使在batch duration內(nèi)剛做了調(diào)整,但可能本batch duration馬上就會(huì)過期愉择。

你可以考慮改變執(zhí)行周期(intervalMills)劫乱,來動(dòng)態(tài)調(diào)整织中。在一個(gè)batchduration中要對(duì)數(shù)據(jù)分片锥涕,可以算一下已擁有閑置的core,如果不夠狭吼,則可以申請(qǐng)?jiān)黾覧xecutor层坠,從而把任務(wù)分配到新增的Executor。

也可以考量針對(duì)上一個(gè)batchduration的資源需求情況刁笙,因?yàn)榉逯党霈F(xiàn)時(shí)破花,往往會(huì)延續(xù)在多個(gè)連續(xù)的batch duration中谦趣。考量上一個(gè)batch duration的情況座每,用某種算法來動(dòng)態(tài)調(diào)整后續(xù)的batch duration的資源前鹅。修改Spark Streaming可以設(shè)計(jì)StreamingContext的新子類。

其實(shí)前面的動(dòng)態(tài)資源分配的定制方式做起來不容易峭梳,可能仍不太合適舰绘。

備注:

資料來源于:DT_大數(shù)據(jù)夢(mèng)工廠(Spark發(fā)行版本定制)

更多私密內(nèi)容,請(qǐng)關(guān)注微信公眾號(hào):DT_Spark

如果您對(duì)大數(shù)據(jù)Spark感興趣葱椭,可以免費(fèi)聽由王家林老師每天晚上20:00開設(shè)的Spark永久免費(fèi)公開課捂寿,地址YY房間號(hào):68917580

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市孵运,隨后出現(xiàn)的幾起案子秦陋,更是在濱河造成了極大的恐慌,老刑警劉巖治笨,帶你破解...
    沈念sama閱讀 218,640評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件驳概,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡大磺,警方通過查閱死者的電腦和手機(jī)抡句,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,254評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來杠愧,“玉大人待榔,你說我怎么就攤上這事×骷茫” “怎么了锐锣?”我有些...
    開封第一講書人閱讀 165,011評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵,是天一觀的道長绳瘟。 經(jīng)常有香客問我雕憔,道長,這世上最難降的妖魔是什么糖声? 我笑而不...
    開封第一講書人閱讀 58,755評(píng)論 1 294
  • 正文 為了忘掉前任斤彼,我火速辦了婚禮,結(jié)果婚禮上蘸泻,老公的妹妹穿的比我還像新娘琉苇。我一直安慰自己,他們只是感情好悦施,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,774評(píng)論 6 392
  • 文/花漫 我一把揭開白布并扇。 她就那樣靜靜地躺著,像睡著了一般抡诞。 火紅的嫁衣襯著肌膚如雪穷蛹。 梳的紋絲不亂的頭發(fā)上土陪,一...
    開封第一講書人閱讀 51,610評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音肴熏,去河邊找鬼鬼雀。 笑死,一個(gè)胖子當(dāng)著我的面吹牛蛙吏,可吹牛的內(nèi)容都是我干的取刃。 我是一名探鬼主播,決...
    沈念sama閱讀 40,352評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼出刷,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼璧疗!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起馁龟,我...
    開封第一講書人閱讀 39,257評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤崩侠,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后坷檩,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體却音,經(jīng)...
    沈念sama閱讀 45,717評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,894評(píng)論 3 336
  • 正文 我和宋清朗相戀三年矢炼,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了系瓢。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,021評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡句灌,死狀恐怖夷陋,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情胰锌,我是刑警寧澤骗绕,帶...
    沈念sama閱讀 35,735評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站资昧,受9級(jí)特大地震影響酬土,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜格带,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,354評(píng)論 3 330
  • 文/蒙蒙 一撤缴、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧叽唱,春花似錦屈呕、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,936評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽芥吟。三九已至侦铜,卻和暖如春专甩,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背钉稍。 一陣腳步聲響...
    開封第一講書人閱讀 33,054評(píng)論 1 270
  • 我被黑心中介騙來泰國打工涤躲, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人贡未。 一個(gè)月前我還...
    沈念sama閱讀 48,224評(píng)論 3 371
  • 正文 我出身青樓种樱,卻偏偏與公主長得像,于是被迫代替她去往敵國和親俊卤。 傳聞我的和親對(duì)象是個(gè)殘疾皇子嫩挤,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,974評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容